import gradio as gr import requests import folium import pandas as pd import time import os import zipfile import io from typing import Dict, List, Tuple from datetime import datetime, timedelta import pytz class AccurateAirQualityMapper: """Air Quality Mapper with precise EPA coordinates""" def __init__(self): self.airnow_base_url = "https://files.airnowtech.org" self.epa_base_url = "https://aqs.epa.gov/aqsweb/airdata" self.aqi_colors = { "Good": "#00E400", "Moderate": "#FFFF00", "Unhealthy for Sensitive Groups": "#FF7E00", "Unhealthy": "#FF0000", "Very Unhealthy": "#8F3F97", "Hazardous": "#7E0023" } self.aqi_ranges = { (0, 50): "Good", (51, 100): "Moderate", (101, 150): "Unhealthy for Sensitive Groups", (151, 200): "Unhealthy", (201, 300): "Very Unhealthy", (301, 500): "Hazardous" } # Cache for coordinate lookups self.coordinate_cache = {} def download_epa_coordinates(self) -> Dict[str, Tuple[float, float]]: """Download EPA monitor coordinates and create lookup dictionary""" print("πŸ—ΊοΈ Downloading EPA monitor coordinates...") coordinates = {} try: # Download monitor listing (most comprehensive) monitors_url = f"{self.epa_base_url}/aqs_monitors.zip" print(f"Downloading: {monitors_url}") response = requests.get(monitors_url, timeout=60) if response.status_code == 200: # Extract CSV from ZIP with zipfile.ZipFile(io.BytesIO(response.content)) as z: csv_filename = z.namelist()[0] # Should be monitors.csv with z.open(csv_filename) as f: # Read CSV with pandas df = pd.read_csv(f) print(f"πŸ“Š Loaded {len(df)} monitor records") print(f"Columns: {list(df.columns)}") # Create lookup by AQS ID (State+County+Site+Parameter+POC) for _, row in df.iterrows(): try: # Build AQS ID from components state_code = str(row.get('State Code', '')).zfill(2) county_code = str(row.get('County Code', '')).zfill(3) site_number = str(row.get('Site Number', '')).zfill(4) aqs_id = f"{state_code}{county_code}{site_number}" # Get coordinates lat = float(row.get('Latitude', 0)) lon = float(row.get('Longitude', 0)) if lat != 0 and lon != 0 and aqs_id != "0000000": coordinates[aqs_id] = (lat, lon) except (ValueError, TypeError): continue print(f"βœ… Created coordinate lookup for {len(coordinates)} stations") else: print(f"❌ Failed to download monitors: HTTP {response.status_code}") except Exception as e: print(f"❌ Error downloading EPA coordinates: {str(e)}") # Fallback: try sites file if len(coordinates) < 1000: # If we didn't get enough coordinates try: print("πŸ”„ Trying sites file as backup...") sites_url = f"{self.epa_base_url}/aqs_sites.zip" response = requests.get(sites_url, timeout=60) if response.status_code == 200: with zipfile.ZipFile(io.BytesIO(response.content)) as z: csv_filename = z.namelist()[0] with z.open(csv_filename) as f: df = pd.read_csv(f) for _, row in df.iterrows(): try: state_code = str(row.get('State Code', '')).zfill(2) county_code = str(row.get('County Code', '')).zfill(3) site_number = str(row.get('Site Number', '')).zfill(4) aqs_id = f"{state_code}{county_code}{site_number}" lat = float(row.get('Latitude', 0)) lon = float(row.get('Longitude', 0)) if lat != 0 and lon != 0 and aqs_id not in coordinates: coordinates[aqs_id] = (lat, lon) except (ValueError, TypeError): continue print(f"βœ… Added {len(coordinates)} total coordinates") except Exception as e: print(f"❌ Error with sites backup: {str(e)}") self.coordinate_cache = coordinates return coordinates def get_aqi_category(self, aqi_value: int) -> str: """Get AQI category based on value""" for (min_val, max_val), category in self.aqi_ranges.items(): if min_val <= aqi_value <= max_val: return category return "Unknown" def calculate_aqi(self, parameter: str, value: float) -> int: """Calculate AQI for common parameters""" if parameter == 'OZONE' and value > 0: if value <= 54: return int((50/54) * value) elif value <= 70: return int(51 + (49/16) * (value - 54)) elif value <= 85: return int(101 + (49/15) * (value - 70)) elif value <= 105: return int(151 + (49/20) * (value - 85)) else: return int(201 + (199/95) * min(value - 105, 95)) elif parameter == 'PM2.5' and value >= 0: if value <= 12.0: return int((50/12) * value) elif value <= 35.4: return int(51 + (49/23.4) * (value - 12)) elif value <= 55.4: return int(101 + (49/20) * (value - 35.4)) elif value <= 150.4: return int(151 + (49/95) * (value - 55.4)) else: return int(201 + (199/149.6) * min(value - 150.4, 149.6)) elif parameter == 'PM10' and value >= 0: if value <= 54: return int((50/54) * value) elif value <= 154: return int(51 + (49/100) * (value - 54)) elif value <= 254: return int(101 + (49/100) * (value - 154)) elif value <= 354: return int(151 + (49/100) * (value - 254)) else: return int(201 + (199/146) * min(value - 354, 146)) return 0 def fetch_airnow_bulk_data(self) -> Tuple[List[Dict], str]: """Fetch current AirNow bulk data""" print("🎯 Fetching AirNow bulk data...") try: # Get current GMT time gmt_now = datetime.now(pytz.UTC) # Try current hour and previous few hours for hour_offset in range(0, 6): try: target_time = gmt_now - timedelta(hours=hour_offset) filename = f"HourlyData_{target_time.strftime('%Y%m%d%H')}.dat" url = f"{self.airnow_base_url}/airnow/today/{filename}" print(f"πŸ” Trying: {url}") response = requests.get(url, timeout=30) if response.status_code == 200 and response.text.strip(): print(f"βœ… Found data file with {len(response.text.splitlines())} lines") # Parse the data data = self.parse_hourly_data_file(response.text) if data: print(f"πŸ“Š Parsed {len(data)} station records") return data, f"βœ… SUCCESS: {len(data)} monitoring stations from {filename}" except Exception as e: print(f"❌ Error trying hour {hour_offset}: {str(e)}") continue time.sleep(0.1) return [], "❌ No recent data files found" except Exception as e: return [], f"❌ Error fetching bulk data: {str(e)}" def parse_hourly_data_file(self, text: str) -> List[Dict]: """Parse AirNow hourly data format""" lines = text.strip().split('\n') data = [] # Download coordinates if not cached if not self.coordinate_cache: self.download_epa_coordinates() for line in lines: if not line.strip(): continue try: fields = line.split('|') if len(fields) >= 9: aqs_id = fields[2] # AQS ID from file # Look up coordinates lat, lon = self.coordinate_cache.get(aqs_id[:9], (0, 0)) # Use first 9 chars (site ID) # Skip if no coordinates found if lat == 0 and lon == 0: continue value = float(fields[7]) if fields[7].replace('.','').replace('-','').isdigit() else 0 parameter = fields[5] # Only include air quality parameters if parameter not in ['OZONE', 'PM2.5', 'PM10', 'NO2', 'SO2', 'CO']: continue aqi = self.calculate_aqi(parameter, value) record = { 'DateObserved': fields[0], 'HourObserved': fields[1], 'AQSID': aqs_id, 'SiteName': fields[3], 'ParameterName': parameter, 'ReportingUnits': fields[6], 'Value': value, 'DataSource': fields[8] if len(fields) > 8 else '', 'Latitude': lat, 'Longitude': lon, 'AQI': aqi, 'Category': {'Name': self.get_aqi_category(aqi)}, 'ReportingArea': fields[3], 'StateCode': aqs_id[:2] if len(aqs_id) >= 2 else 'US' } data.append(record) except Exception as e: continue print(f"βœ… Found coordinates for {len(data)} stations") return data def create_map(self, data: List[Dict]) -> str: """Create interactive map with accurate coordinates""" if not data: m = folium.Map(location=[39.8283, -98.5795], zoom_start=4) folium.Marker( [39.8283, -98.5795], popup="No air quality data available.", icon=folium.Icon(color='red', icon='info-sign') ).add_to(m) return m._repr_html_() # Calculate center lats = [item['Latitude'] for item in data] lons = [item['Longitude'] for item in data] center_lat = sum(lats) / len(lats) center_lon = sum(lons) / len(lons) # Create map m = folium.Map(location=[center_lat, center_lon], zoom_start=4) # Add markers for item in data: try: lat = item['Latitude'] lon = item['Longitude'] aqi = item['AQI'] parameter = item['ParameterName'] site_name = item['SiteName'] value = item['Value'] units = item['ReportingUnits'] category = item['Category']['Name'] # Create popup popup_content = f"""

{site_name}

Parameter: {parameter}

Value: {value} {units}

AQI: {aqi} ({category})

Coordinates: {lat:.4f}, {lon:.4f}

Time: {item['DateObserved']} {item['HourObserved']}:00 GMT

Station ID: {item['AQSID']}

""" # Color based on AQI if aqi <= 50: marker_color = 'green' elif aqi <= 100: marker_color = 'orange' elif aqi <= 150: marker_color = 'orange' elif aqi <= 200: marker_color = 'red' elif aqi <= 300: marker_color = 'purple' else: marker_color = 'darkred' # Add marker folium.Marker( [lat, lon], popup=folium.Popup(popup_content, max_width=300), tooltip=f"{site_name}: {parameter} = {value} {units} (AQI: {aqi})", icon=folium.Icon(color=marker_color, icon='cloud') ).add_to(m) except Exception as e: continue # Add legend legend_html = """

AQI Legend

Good (0-50)

Moderate (51-100)

Unhealthy for Sensitive (101-150)

Unhealthy (151-200)

Very Unhealthy (201-300)

Hazardous (301+)

""" m.get_root().html.add_child(folium.Element(legend_html)) return m._repr_html_() def create_data_table(self, data: List[Dict]) -> pd.DataFrame: """Create data table""" if not data: return pd.DataFrame() table_data = [] for item in data: table_data.append({ 'Site Name': item['SiteName'], 'State': item['StateCode'], 'Parameter': item['ParameterName'], 'Value': item['Value'], 'Units': item['ReportingUnits'], 'AQI': item['AQI'], 'Category': item['Category']['Name'], 'Latitude': round(item['Latitude'], 4), 'Longitude': round(item['Longitude'], 4), 'Date': item['DateObserved'], 'Hour (GMT)': item['HourObserved'], 'Station ID': item['AQSID'] }) df = pd.DataFrame(table_data) return df.sort_values('AQI', ascending=False) # Initialize mapper mapper = AccurateAirQualityMapper() def update_map(): """Update map with accurate coordinates""" print("πŸš€ Starting accurate air quality mapping...") # Fetch data data, status = mapper.fetch_airnow_bulk_data() # Create map map_html = mapper.create_map(data) # Create table df = mapper.create_data_table(data) return map_html, df, status # Create Gradio interface with gr.Blocks(title="Accurate AirNow Sensor Map", theme=gr.themes.Soft()) as demo: gr.Markdown( """ # 🎯 Accurate AirNow Air Quality Map **βœ… PRECISE COORDINATES** - Uses EPA's official monitor coordinate database! This map displays real-time air quality data with **accurate station locations** by: 1. **Downloading EPA coordinates**: Gets precise lat/lon for every monitoring station 2. **Fetching AirNow bulk data**: Current hourly readings from 2,000+ stations 3. **Accurate mapping**: Stations plotted at their exact geographic locations ## Key Features: - 🎯 **Precise Locations**: EPA's official coordinate database - 🌍 **Complete Coverage**: All active AirNow monitoring stations - ⚑ **Real-time Data**: Latest hourly observations - πŸ“Š **Air Quality Focus**: OZONE, PM2.5, PM10, NO2, SO2, CO - πŸ”„ **Auto-updated**: Fresh data every hour **⚠️ Data Note**: This displays preliminary, real-time data for public information. For regulatory purposes, use EPA's official AQS data. """ ) with gr.Row(): load_button = gr.Button("🎯 Load Accurate Air Quality Map", variant="primary", size="lg") status_text = gr.Markdown("Click the button above to load current air quality data with precise coordinates.") with gr.Tabs(): with gr.TabItem("πŸ—ΊοΈ Accurate Map"): map_output = gr.HTML(label="Air Quality Map with Precise Coordinates") with gr.TabItem("πŸ“Š Station Data"): data_table = gr.Dataframe( label="Air Quality Monitoring Stations", interactive=False ) gr.Markdown( """ ## Data Sources: **Coordinates**: EPA Air Quality System (AQS) - Official monitor locations **Air Quality Data**: AirNow hourly bulk files - Real-time observations **Coverage**: 2,000+ monitoring stations across US, Canada, and parts of Mexico ## Files Used: - `aqs_monitors.zip` - EPA monitor coordinates (364,377+ records) - `HourlyData_YYYYMMDDHH.dat` - AirNow real-time observations ## Links: - [EPA AQS Data](https://aqs.epa.gov/aqsweb/airdata/download_files.html) - [AirNow Bulk Files](https://files.airnowtech.org/airnow/today/) - [EPA Monitor Map](https://www.epa.gov/outdoor-air-quality-data/interactive-map-air-quality-monitors) """ ) # Set up event handler load_button.click( fn=update_map, inputs=[], outputs=[map_output, data_table, status_text] ) if __name__ == "__main__": demo.launch()