import gradio as gr import json import os import logging from datetime import datetime from email.utils import parsedate_to_datetime # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) try: from scraper import fetch_hazard_tweets, fetch_custom_tweets, get_available_hazards, get_available_locations from classifier import classify_tweets from pg_db import init_db, upsert_hazardous_tweet # Initialize database (optional - will work without it) try: init_db() logger.info("Database initialized successfully") except Exception as e: logger.warning(f"Database initialization failed: {e}. App will work without database persistence.") except ImportError as e: logger.error(f"Failed to import required modules: {e}") raise def run_pipeline(limit=20, hazard_type=None, location=None, days_back=1): """Run the hazard detection pipeline""" try: logger.info(f"Starting pipeline with limit: {limit}, hazard: {hazard_type}, location: {location}") # Choose search method based on parameters if hazard_type or location: tweets = fetch_custom_tweets( hazard_type=hazard_type, location=location, limit=limit, days_back=days_back ) else: tweets = fetch_hazard_tweets(limit=limit) logger.info(f"Fetched {len(tweets)} tweets") # Process tweets: translate -> classify -> analyze logger.info("🔄 Processing tweets (this may take 1-2 minutes for first request)...") results = classify_tweets(tweets) logger.info(f"✅ Processed {len(results)} tweets (translated, classified, and analyzed)") # Store hazardous tweets in database (optional) try: hazardous_count = 0 for r in results: if r.get('hazardous') == 1: hazardous_count += 1 hazards = (r.get('ner') or {}).get('hazards') or [] hazard_type = ", ".join(hazards) if hazards else "unknown" locs = (r.get('ner') or {}).get('locations') or [] if not locs and r.get('location'): locs = [r['location']] location = ", ".join(locs) if locs else "unknown" sentiment = r.get('sentiment') or {"label": "unknown", "score": 0.0} created_at = r.get('created_at') or "" tweet_date = "" tweet_time = "" if created_at: dt = None try: dt = parsedate_to_datetime(created_at) except Exception: dt = None if dt is None and 'T' in created_at: try: iso = created_at.replace('Z', '+00:00') dt = datetime.fromisoformat(iso) except Exception: dt = None if dt is not None: tweet_date = dt.date().isoformat() tweet_time = dt.time().strftime('%H:%M:%S') upsert_hazardous_tweet( tweet_url=r.get('tweet_url') or "", hazard_type=hazard_type, location=location, sentiment_label=sentiment.get('label', 'unknown'), sentiment_score=float(sentiment.get('score', 0.0)), tweet_date=tweet_date, tweet_time=tweet_time, ) logger.info(f"Stored {hazardous_count} hazardous tweets in database") except Exception as db_error: logger.warning(f"Database storage failed: {db_error}. Results will not be persisted.") return results except Exception as e: logger.error(f"Pipeline failed: {str(e)}") return f"Error: {str(e)}" def analyze_tweets(limit, hazard_type, location, days_back): """Gradio interface function to analyze tweets""" try: limit = int(limit) if limit else 20 days_back = int(days_back) if days_back else 1 # Clean up inputs hazard_type = hazard_type.strip() if hazard_type else None location = location.strip() if location else None results = run_pipeline( limit=limit, hazard_type=hazard_type, location=location, days_back=days_back ) if isinstance(results, str): # Error case return results, "" # Count hazardous tweets hazardous_count = sum(1 for r in results if r.get('hazardous') == 1) total_count = len(results) # Format results for display display_text = f"Analyzed {total_count} tweets, found {hazardous_count} hazardous tweets.\n\n" for i, result in enumerate(results, 1): status = "🚨 HAZARDOUS" if result.get('hazardous') == 1 else "✅ Safe" display_text += f"{i}. {status}\n" display_text += f" Text: {result.get('text', 'N/A')[:100]}...\n" if result.get('translated_text'): display_text += f" Translated: {result.get('translated_text', 'N/A')[:100]}...\n" if result.get('hazardous') == 1: sentiment = result.get('sentiment', {}) display_text += f" Sentiment: {sentiment.get('label', 'unknown')} ({sentiment.get('score', 0):.2f})\n" ner = result.get('ner', {}) if ner.get('hazards'): display_text += f" Hazards: {', '.join(ner.get('hazards', []))}\n" if ner.get('locations'): display_text += f" Locations: {', '.join(ner.get('locations', []))}\n" display_text += f" URL: {result.get('tweet_url', 'N/A')}\n\n" # Create JSON output json_output = json.dumps(results, indent=2, ensure_ascii=False) return display_text, json_output except Exception as e: return f"Error: {str(e)}", "" # Health check endpoint def health_check(): """Simple health check for Docker""" return {"status": "healthy", "message": "Ocean Hazard Detection System is running"} # Create Gradio interface with gr.Blocks(title="Ocean Hazard Detection", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🌊 Ocean Hazard Detection System This system analyzes tweets to detect ocean-related hazards using AI. It: - Scrapes tweets about ocean hazards from Indian coastal regions - Classifies tweets as hazardous or safe using multilingual AI - Translates non-English tweets to English - Analyzes sentiment and extracts hazard types and locations - Stores hazardous tweets in a database for tracking **Note**: This demo uses a limited dataset. In production, it would analyze real-time tweets. """) with gr.Row(): with gr.Column(): limit_input = gr.Number( label="Number of tweets to analyze", value=10, minimum=1, maximum=50, step=1 ) days_back_input = gr.Number( label="Days back to search", value=1, minimum=1, maximum=7, step=1 ) analyze_btn = gr.Button("🔍 Analyze Tweets", variant="primary") with gr.Column(): hazard_type_input = gr.Dropdown( label="Hazard Type (Optional)", choices=get_available_hazards() if 'get_available_hazards' in globals() else [], value=None, allow_custom_value=True, info="Select a specific hazard type or leave empty for all hazards" ) location_input = gr.Dropdown( label="Location (Optional)", choices=get_available_locations() if 'get_available_locations' in globals() else [], value=None, allow_custom_value=True, info="Select a specific location or leave empty for all locations" ) with gr.Column(): gr.Markdown("### 📊 Analysis Results") results_text = gr.Textbox( label="Analysis Summary", lines=15, max_lines=20, interactive=False ) with gr.Row(): gr.Markdown("### 📄 Raw JSON Output") json_output = gr.Textbox( label="Complete Analysis Data (JSON)", lines=10, max_lines=15, interactive=False ) # Event handlers analyze_btn.click( fn=analyze_tweets, inputs=[limit_input, hazard_type_input, location_input, days_back_input], outputs=[results_text, json_output] ) # Add some example queries gr.Markdown(""" ### 🔍 What this system looks for: - **Hazard Keywords**: flood, tsunami, cyclone, storm surge, high tide, high waves, swell, coastal flooding, rip current, coastal erosion, water discoloration, algal bloom, marine debris, pollution - **Locations**: Mumbai, Chennai, Kolkata, Odisha, Kerala, Gujarat, Goa, Andhra Pradesh, West Bengal, Vizag, Puri, Bay of Bengal, Arabian Sea - **Languages**: Supports 20+ Indian languages including Hindi, Bengali, Tamil, Telugu, Marathi, Gujarati, and English """) if __name__ == "__main__": # Add health check route demo.launch( server_name="0.0.0.0", # Important for Docker server_port=7860, # Gradio default port show_error=True, # Show errors in the interface share=False, # Don't create public link debug=True # Enable debug mode )