""" Smart Waste Segregation System - Hardware-Realistic Simulator with Real Model Simulates complete edge deployment with realistic sensor protocols, motor control, and hardware timing Dependencies: pip install streamlit numpy pillow opencv-python-headless sqlite3 torch torchvision """ import streamlit as st import sqlite3 import json import time import random import numpy as np from PIL import Image import io import base64 from datetime import datetime from dataclasses import dataclass, asdict from typing import Dict, List, Optional, Tuple import threading import queue from enum import Enum import os # PyTorch imports import torch import torch.nn as nn from torchvision import transforms, models # ==================== HARDWARE CONSTANTS ==================== class HardwareConfig: """Realistic hardware specifications""" # Ultrasonic Sensor (HC-SR04) ULTRASONIC_MIN_DISTANCE = 2.0 # cm ULTRASONIC_MAX_DISTANCE = 400.0 # cm ULTRASONIC_ACCURACY = 0.3 # cm ULTRASONIC_READ_TIME = 0.015 # seconds (15ms) # Capacitive Moisture Sensor MOISTURE_MIN_VOLTAGE = 0.0 # V (dry) MOISTURE_MAX_VOLTAGE = 3.3 # V (wet) MOISTURE_ADC_RESOLUTION = 12 # bits (4096 levels) MOISTURE_READ_TIME = 0.010 # seconds (10ms) # Load Cell (HX711) WEIGHT_MAX_CAPACITY = 5000.0 # grams (5kg) WEIGHT_RESOLUTION = 0.1 # grams WEIGHT_STABILIZATION_TIME = 0.5 # seconds WEIGHT_READ_TIME = 0.100 # seconds (100ms) # Servo Motor (MG996R) SERVO_MIN_ANGLE = 0 SERVO_MAX_ANGLE = 180 SERVO_SPEED = 60 # degrees per 0.17 seconds (at 4.8V) SERVO_CURRENT_IDLE = 10 # mA SERVO_CURRENT_MOVING = 500 # mA SERVO_TORQUE = 11 # kg-cm # Camera (Raspberry Pi Camera Module v2) CAMERA_RESOLUTION = (1920, 1080) CAMERA_CAPTURE_TIME = 0.150 # seconds CAMERA_WARMUP_TIME = 2.0 # seconds # Edge Device (Raspberry Pi 4) CPU_CORES = 4 RAM_MB = 4096 INFERENCE_OVERHEAD = 0.050 # seconds # ==================== DATABASE SETUP ==================== def init_database(): """Initialize SQLite database with required tables""" conn = sqlite3.connect('waste_segregation.db', check_same_thread=False) cursor = conn.cursor() # Check if we need to migrate old database try: cursor.execute("SELECT total_pipeline_time_ms FROM events LIMIT 1") except sqlite3.OperationalError: # Column doesn't exist, need to add it try: cursor.execute("ALTER TABLE events ADD COLUMN total_pipeline_time_ms REAL") conn.commit() st.info("✅ Database schema updated successfully") except sqlite3.OperationalError: # Table doesn't exist at all, will be created below pass # Events table cursor.execute(''' CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, image_name TEXT, waste_class TEXT NOT NULL, confidence REAL NOT NULL, distance_cm REAL, moisture_raw INTEGER, moisture_percent REAL, weight_g REAL, servo_angle INTEGER, inference_time_ms REAL, total_pipeline_time_ms REAL, status TEXT, power_consumption_mw REAL ) ''') # Telemetry table cursor.execute(''' CREATE TABLE IF NOT EXISTS telemetry ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, component TEXT NOT NULL, data TEXT NOT NULL ) ''') # Hardware diagnostics table cursor.execute(''' CREATE TABLE IF NOT EXISTS hardware_diagnostics ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, component TEXT NOT NULL, status TEXT NOT NULL, voltage REAL, current_ma REAL, temperature_c REAL, error_count INTEGER ) ''') conn.commit() return conn # ==================== DATA MODELS ==================== @dataclass class SensorReading: """Raw sensor reading with hardware-realistic properties""" value: float raw_value: int # ADC value voltage: float read_time_ms: float noise_level: float timestamp: str @dataclass class SensorData: """Complete sensor suite readings""" distance_cm: float distance_raw: SensorReading moisture_percent: float moisture_raw: SensorReading weight_g: float weight_raw: SensorReading temperature: float timestamp: str total_read_time_ms: float @dataclass class InferenceResult: """AI model inference output with edge device metrics""" waste_class: str confidence: float inference_time_ms: float preprocessing_time_ms: float postprocessing_time_ms: float probabilities: Dict[str, float] model_version: str device_temperature: float cpu_usage_percent: float @dataclass class MotorControl: """Servo motor control with realistic physics""" target_angle: int current_angle: int start_angle: int status: str duration_ms: float power_consumption_mw: float torque_applied: float movement_profile: List[Tuple[float, int]] # (time_ms, angle) class SensorStatus(Enum): """Sensor operational status""" READY = "ready" READING = "reading" STABILIZING = "stabilizing" ERROR = "error" CALIBRATING = "calibrating" # ==================== REALISTIC SENSOR SIMULATION ==================== class UltrasonicSensor: """HC-SR04 Ultrasonic Sensor Simulation""" def __init__(self): self.status = SensorStatus.READY self.error_count = 0 self.last_reading = None self.calibration_offset = random.uniform(-0.2, 0.2) def trigger_read(self, actual_distance: float) -> SensorReading: """Simulate ultrasonic pulse and echo timing""" start_time = time.time() self.status = SensorStatus.READING # Simulate trigger pulse (10μs) and echo wait time.sleep(HardwareConfig.ULTRASONIC_READ_TIME) # Add realistic noise and environmental factors noise = random.gauss(0, HardwareConfig.ULTRASONIC_ACCURACY) temperature_factor = 1.0 + random.uniform(-0.02, 0.02) # Simulate occasional read errors (1% chance) if random.random() < 0.01: self.error_count += 1 measured_distance = -1.0 # Error code else: measured_distance = (actual_distance + noise + self.calibration_offset) * temperature_factor measured_distance = max(HardwareConfig.ULTRASONIC_MIN_DISTANCE, min(HardwareConfig.ULTRASONIC_MAX_DISTANCE, measured_distance)) # Calculate echo time (distance = speed * time / 2) echo_time_us = (measured_distance * 2) / 0.0343 if measured_distance > 0 else 0 read_time = (time.time() - start_time) * 1000 self.status = SensorStatus.READY self.last_reading = measured_distance return SensorReading( value=round(measured_distance, 2), raw_value=int(echo_time_us), voltage=3.3, read_time_ms=round(read_time, 3), noise_level=abs(noise), timestamp=datetime.now().isoformat() ) class MoistureSensor: """Capacitive Soil Moisture Sensor Simulation""" def __init__(self): self.status = SensorStatus.READY self.adc_resolution = 2 ** HardwareConfig.MOISTURE_ADC_RESOLUTION self.calibration_dry = random.randint(3200, 3400) self.calibration_wet = random.randint(1200, 1400) def read_moisture(self, actual_moisture: float) -> SensorReading: """Read capacitive moisture sensor via ADC""" start_time = time.time() self.status = SensorStatus.READING time.sleep(HardwareConfig.MOISTURE_READ_TIME) # Convert moisture percentage to ADC value adc_range = self.calibration_dry - self.calibration_wet adc_value = self.calibration_dry - (actual_moisture * adc_range) # Add ADC noise (±2 LSB typical) adc_noise = random.randint(-2, 2) adc_value = int(adc_value + adc_noise) adc_value = max(0, min(self.adc_resolution - 1, adc_value)) # Convert ADC to voltage voltage = (adc_value / self.adc_resolution) * HardwareConfig.MOISTURE_MAX_VOLTAGE # Convert back to percentage measured_moisture = (self.calibration_dry - adc_value) / adc_range measured_moisture = max(0.0, min(1.0, measured_moisture)) read_time = (time.time() - start_time) * 1000 self.status = SensorStatus.READY return SensorReading( value=round(measured_moisture, 4), raw_value=adc_value, voltage=round(voltage, 3), read_time_ms=round(read_time, 3), noise_level=abs(adc_noise / adc_range) if adc_range != 0 else 0.0, timestamp=datetime.now().isoformat() ) class LoadCellSensor: """HX711 Load Cell Amplifier Simulation""" def __init__(self): self.status = SensorStatus.READY self.tare_value = random.randint(-50000, 50000) self.calibration_factor = random.uniform(420, 450) self.last_stable_reading = 0.0 self.reading_buffer = [] def read_weight(self, actual_weight: float) -> SensorReading: """Read load cell with stabilization""" start_time = time.time() self.status = SensorStatus.STABILIZING # Simulate mechanical settling time time.sleep(HardwareConfig.WEIGHT_STABILIZATION_TIME * 0.2) self.status = SensorStatus.READING # Take multiple readings and average readings = [] for _ in range(10): adc_value = self.tare_value + int(actual_weight * self.calibration_factor) noise = random.randint(-2, 2) vibration = random.gauss(0, 5) adc_value += int(noise + vibration) readings.append(adc_value) time.sleep(HardwareConfig.WEIGHT_READ_TIME / 10) # Average and filter avg_adc = int(np.mean(readings)) measured_weight = (avg_adc - self.tare_value) / self.calibration_factor measured_weight = max(0.0, min(HardwareConfig.WEIGHT_MAX_CAPACITY, measured_weight)) measured_weight = round(measured_weight / HardwareConfig.WEIGHT_RESOLUTION) * HardwareConfig.WEIGHT_RESOLUTION read_time = (time.time() - start_time) * 1000 self.status = SensorStatus.READY self.last_stable_reading = measured_weight return SensorReading( value=round(measured_weight, 2), raw_value=avg_adc, voltage=3.3, read_time_ms=round(read_time, 3), noise_level=np.std(readings) / self.calibration_factor, timestamp=datetime.now().isoformat() ) class SensorHub: """Integrated sensor management system""" def __init__(self): self.ultrasonic = UltrasonicSensor() self.moisture = MoistureSensor() self.load_cell = LoadCellSensor() self.temperature = 28.0 self.baseline = { 'distance_cm': 25.0, 'moisture': 0.3, 'weight_g': 150.0 } def read_all_sensors(self) -> SensorData: """Read all sensors with realistic timing""" start_time = time.time() distance_reading = self.ultrasonic.trigger_read(self.baseline['distance_cm']) moisture_reading = self.moisture.read_moisture(self.baseline['moisture']) weight_reading = self.load_cell.read_weight(self.baseline['weight_g']) self.temperature = 28.0 + random.uniform(-2, 2) total_time = (time.time() - start_time) * 1000 return SensorData( distance_cm=distance_reading.value, distance_raw=distance_reading, moisture_percent=moisture_reading.value * 100, moisture_raw=moisture_reading, weight_g=weight_reading.value, weight_raw=weight_reading, temperature=round(self.temperature, 2), timestamp=datetime.now().isoformat(), total_read_time_ms=round(total_time, 2) ) def set_baseline(self, distance=None, moisture=None, weight=None): """Update sensor baselines for simulation""" if distance is not None: self.baseline['distance_cm'] = distance if moisture is not None: self.baseline['moisture'] = moisture if weight is not None: self.baseline['weight_g'] = weight # ==================== REAL EDGE AI INFERENCE ==================== class EdgeAIProcessor: """Edge device AI inference with REAL trained model""" def __init__(self, model_path: Optional[str] = None): self.model_path = model_path or 'best_trash_classifier.pth' self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.model = None self.model_loaded = False self.model_version = "v1.0.0-mock" self.warmup_complete = False self.classes = [] # Edge device specs self.device_temp = 45.0 self.cpu_usage = 25.0 # Image preprocessing transform self.transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) # Try to load real model self.load_model() def load_model(self): """Load the trained PyTorch model""" try: if not os.path.exists(self.model_path): st.warning(f"⚠️ Model file '{self.model_path}' not found. Using mock inference.") self.classes = ["organic", "recyclable", "non-recyclable"] return # Load checkpoint checkpoint = torch.load(self.model_path, map_location=self.device) # Get class names from checkpoint self.classes = checkpoint.get('class_names', ["organic", "recyclable", "non-recyclable"]) num_classes = len(self.classes) # Build model architecture (same as training) self.model = models.efficientnet_v2_s(weights=None) num_features = self.model.classifier[1].in_features self.model.classifier = nn.Sequential( nn.Dropout(p=0.3, inplace=True), nn.Linear(num_features, 512), nn.ReLU(), nn.Dropout(p=0.3), nn.Linear(512, num_classes) ) # Load weights self.model.load_state_dict(checkpoint['model_state_dict']) self.model = self.model.to(self.device) self.model.eval() self.model_loaded = True self.model_version = f"v1.0.0-efficientnet-epoch{checkpoint.get('epoch', 'unknown')}" st.success(f"✅ Model loaded successfully! Classes: {self.classes}") except Exception as e: st.error(f"❌ Failed to load model: {e}") st.info("Using mock inference mode") self.classes = ["organic", "recyclable", "non-recyclable"] self.model_loaded = False def warmup(self): """Warm up model (first inference is slower)""" if not self.warmup_complete and self.model_loaded: try: dummy_input = torch.randn(1, 3, 224, 224).to(self.device) with torch.no_grad(): _ = self.model(dummy_input) self.warmup_complete = True except: pass def preprocess_image(self, image: Image.Image) -> Tuple[torch.Tensor, float]: """Preprocess image for inference with timing""" start_time = time.time() # Convert to RGB if needed if image.mode != 'RGB': image = image.convert('RGB') # Apply transforms img_tensor = self.transform(image) img_tensor = img_tensor.unsqueeze(0) # Add batch dimension preprocess_time = (time.time() - start_time) * 1000 return img_tensor, preprocess_time def infer(self, image: Image.Image, sensor_data: SensorData) -> InferenceResult: """Run inference with real or mock model""" self.warmup() # Preprocessing input_tensor, preprocess_time = self.preprocess_image(image) # Inference inference_start = time.time() if self.model_loaded and self.model is not None: # REAL MODEL INFERENCE try: input_tensor = input_tensor.to(self.device) with torch.no_grad(): outputs = self.model(input_tensor) logits = outputs[0].cpu().numpy() except Exception as e: st.error(f"Inference error: {e}") # Fallback to mock logits = self._mock_inference_with_fusion(sensor_data) else: # MOCK INFERENCE with sensor fusion logits = self._mock_inference_with_fusion(sensor_data) # Simulate realistic inference time time.sleep(random.uniform(0.050, 0.150)) inference_time = (time.time() - inference_start) * 1000 # Postprocessing postprocess_start = time.time() # Softmax exp_logits = np.exp(logits - np.max(logits)) probs = exp_logits / np.sum(exp_logits) class_idx = int(np.argmax(probs)) waste_class = self.classes[class_idx] confidence = float(probs[class_idx]) prob_dict = {cls: float(probs[i]) for i, cls in enumerate(self.classes)} postprocess_time = (time.time() - postprocess_start) * 1000 # Update device metrics self.device_temp = min(85.0, self.device_temp + random.uniform(-1, 2)) self.cpu_usage = min(100.0, max(20.0, self.cpu_usage + random.uniform(-10, 15))) return InferenceResult( waste_class=waste_class, confidence=round(confidence, 4), inference_time_ms=round(inference_time, 2), preprocessing_time_ms=round(preprocess_time, 2), postprocessing_time_ms=round(postprocess_time, 2), probabilities=prob_dict, model_version=self.model_version, device_temperature=round(self.device_temp, 1), cpu_usage_percent=round(self.cpu_usage, 1) ) def _mock_inference_with_fusion(self, sensor_data: SensorData) -> np.ndarray: """Mock inference using multi-modal sensor fusion""" logits = np.random.randn(len(self.classes)) * 0.3 # Moisture-based classification moisture = sensor_data.moisture_percent / 100.0 if moisture > 0.5: logits[0] += 2.5 # High moisture -> organic elif moisture < 0.2: logits[1] += 1.5 # Low moisture -> recyclable else: logits[2] += 1.0 # Medium moisture -> non-recyclable # Weight-based refinement weight = sensor_data.weight_g if weight < 50: logits[1] += 0.5 # Light items often recyclable elif weight > 300: logits[2] += 0.5 # Heavy items often non-recyclable # Distance-based confidence adjustment distance_factor = 1.0 - (sensor_data.distance_cm / 100.0) logits *= (0.7 + 0.3 * distance_factor) return logits # ==================== REALISTIC MOTOR CONTROL ==================== class ServoMotor: """MG996R Servo Motor Simulation with realistic physics""" # Bin routing angles BIN_POSITIONS = { "organic": 0, "recyclable": 90, "non-recyclable": 180 } def __init__(self): self.current_angle = 90 self.target_angle = 90 self.is_moving = False self.speed_deg_per_sec = HardwareConfig.SERVO_SPEED / 0.17 self.voltage = 5.0 self.current_ma = HardwareConfig.SERVO_CURRENT_IDLE self.position_history = [] def calculate_movement_profile(self, start: int, end: int, duration_s: float) -> List[Tuple[float, int]]: """Calculate realistic servo movement with acceleration/deceleration""" profile = [] steps = max(10, int(duration_s * 50)) for i in range(steps + 1): t = i / steps # S-curve motion profile if t < 0.3: progress = (t / 0.3) ** 2 * 0.3 elif t > 0.7: progress = 1.0 - ((1.0 - t) / 0.3) ** 2 * 0.3 else: progress = 0.3 + (t - 0.3) * (0.4 / 0.4) angle = int(start + (end - start) * progress) time_ms = t * duration_s * 1000 profile.append((round(time_ms, 1), angle)) return profile def move_to_bin(self, waste_class: str) -> MotorControl: """Move servo to route waste to correct bin with realistic physics""" target = self.BIN_POSITIONS.get(waste_class, 90) start_angle = self.current_angle start_time = time.time() self.is_moving = True self.target_angle = target self.current_ma = HardwareConfig.SERVO_CURRENT_MOVING angle_distance = abs(target - start_angle) duration_s = (angle_distance / 60.0) * 0.17 movement_profile = self.calculate_movement_profile(start_angle, target, duration_s) # Simulate movement time.sleep(duration_s * 0.3) avg_current = (HardwareConfig.SERVO_CURRENT_MOVING + HardwareConfig.SERVO_CURRENT_IDLE) / 2 power_mw = self.voltage * avg_current torque_applied = HardwareConfig.SERVO_TORQUE * 0.7 self.current_angle = target self.is_moving = False self.current_ma = HardwareConfig.SERVO_CURRENT_IDLE actual_duration = (time.time() - start_time) * 1000 return MotorControl( target_angle=target, current_angle=target, start_angle=start_angle, status="completed", duration_ms=round(actual_duration, 2), power_consumption_mw=round(power_mw, 2), torque_applied=round(torque_applied, 2), movement_profile=movement_profile ) def get_diagnostics(self) -> dict: """Get motor diagnostic information""" return { 'current_angle': self.current_angle, 'target_angle': self.target_angle, 'is_moving': self.is_moving, 'current_ma': self.current_ma, 'voltage': self.voltage, 'temperature_c': 35.0 + random.uniform(-2, 5) } # ==================== CAMERA SIMULATION ==================== class CameraModule: """Raspberry Pi Camera Module v2 Simulation""" def __init__(self): self.resolution = HardwareConfig.CAMERA_RESOLUTION self.is_warmed_up = False self.frame_count = 0 def warmup(self): """Camera warmup (auto-exposure, white balance)""" if not self.is_warmed_up: time.sleep(HardwareConfig.CAMERA_WARMUP_TIME * 0.1) self.is_warmed_up = True def capture_image(self, uploaded_image: Optional[Image.Image] = None) -> Tuple[Image.Image, float]: """Capture image with realistic timing""" self.warmup() start_time = time.time() time.sleep(HardwareConfig.CAMERA_CAPTURE_TIME * 0.5) if uploaded_image: image = uploaded_image else: image = self._generate_synthetic_waste() capture_time = (time.time() - start_time) * 1000 self.frame_count += 1 return image, capture_time def _generate_synthetic_waste(self) -> Image.Image: """Generate synthetic waste image for testing""" colors = [ (139, 90, 43), # Brown (organic) (200, 200, 200), # Gray (recyclable) (50, 50, 50) # Dark (non-recyclable) ] color = random.choice(colors) color = tuple(max(0, min(255, c + random.randint(-30, 30))) for c in color) image = Image.new('RGB', (224, 224), color=color) return image # ==================== DATA LOGGER ==================== class DataLogger: """Logs all events, telemetry, and hardware diagnostics to SQLite""" def __init__(self, conn): self.conn = conn self.lock = threading.Lock() def log_event(self, sensor_data: SensorData, inference: InferenceResult, motor: MotorControl, image_name: str = "sample.jpg"): """Log complete segregation event with hardware metrics""" with self.lock: cursor = self.conn.cursor() total_time = (sensor_data.total_read_time_ms + inference.preprocessing_time_ms + inference.inference_time_ms + inference.postprocessing_time_ms + motor.duration_ms) cursor.execute(''' INSERT INTO events ( timestamp, image_name, waste_class, confidence, distance_cm, moisture_raw, moisture_percent, weight_g, servo_angle, inference_time_ms, total_pipeline_time_ms, status, power_consumption_mw ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( sensor_data.timestamp, image_name, inference.waste_class, inference.confidence, sensor_data.distance_cm, sensor_data.moisture_raw.raw_value, sensor_data.moisture_percent, sensor_data.weight_g, motor.target_angle, inference.inference_time_ms, total_time, motor.status, motor.power_consumption_mw )) self.conn.commit() def log_telemetry(self, component: str, data: dict): """Log component telemetry""" with self.lock: cursor = self.conn.cursor() cursor.execute(''' INSERT INTO telemetry (timestamp, component, data) VALUES (?, ?, ?) ''', ( datetime.now().isoformat(), component, json.dumps(data) )) self.conn.commit() def log_hardware_diagnostics(self, component: str, status: str, voltage: float = 0, current_ma: float = 0, temperature: float = 0, error_count: int = 0): """Log hardware diagnostics""" with self.lock: cursor = self.conn.cursor() cursor.execute(''' INSERT INTO hardware_diagnostics (timestamp, component, status, voltage, current_ma, temperature_c, error_count) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( datetime.now().isoformat(), component, status, voltage, current_ma, temperature, error_count )) self.conn.commit() def get_recent_events(self, limit: int = 10) -> List[dict]: """Retrieve recent events""" cursor = self.conn.cursor() cursor.execute(''' SELECT * FROM events ORDER BY timestamp DESC LIMIT ? ''', (limit,)) columns = [desc[0] for desc in cursor.description] return [dict(zip(columns, row)) for row in cursor.fetchall()] def get_statistics(self) -> dict: """Get overall statistics""" cursor = self.conn.cursor() # Class distribution cursor.execute(''' SELECT waste_class, COUNT(*) as count FROM events GROUP BY waste_class ''') class_dist = dict(cursor.fetchall()) # Average confidence cursor.execute('SELECT AVG(confidence) FROM events') avg_conf = cursor.fetchone()[0] or 0 # Average pipeline time cursor.execute('SELECT AVG(total_pipeline_time_ms) FROM events') avg_time = cursor.fetchone()[0] or 0 # Total events cursor.execute('SELECT COUNT(*) FROM events') total = cursor.fetchone()[0] # Total power consumption cursor.execute('SELECT SUM(power_consumption_mw) FROM events') total_power = cursor.fetchone()[0] or 0 return { 'total_events': total, 'class_distribution': class_dist, 'average_confidence': round(avg_conf, 4), 'average_pipeline_time_ms': round(avg_time, 2), 'total_power_consumption_mwh': round(total_power / 3600000, 4) } def get_hardware_diagnostics(self, limit: int = 20) -> List[dict]: """Get recent hardware diagnostics""" cursor = self.conn.cursor() cursor.execute(''' SELECT * FROM hardware_diagnostics ORDER BY timestamp DESC LIMIT ? ''', (limit,)) columns = [desc[0] for desc in cursor.description] return [dict(zip(columns, row)) for row in cursor.fetchall()] # ==================== STREAMLIT DASHBOARD ==================== def main(): st.set_page_config( page_title="Smart Waste Segregation System", page_icon="♻️", layout="wide" ) # Initialize components if 'db_conn' not in st.session_state: st.session_state.db_conn = init_database() if 'sensor_hub' not in st.session_state: st.session_state.sensor_hub = SensorHub() if 'edge_ai' not in st.session_state: st.session_state.edge_ai = EdgeAIProcessor() if 'servo' not in st.session_state: st.session_state.servo = ServoMotor() if 'camera' not in st.session_state: st.session_state.camera = CameraModule() if 'logger' not in st.session_state: st.session_state.logger = DataLogger(st.session_state.db_conn) if 'execution_log' not in st.session_state: st.session_state.execution_log = [] # Header st.title("♻️ Smart Waste Segregation System - Hardware Simulator") st.markdown("**IDEA-ONE Hackathon | Team Trackify | Edge AI Deployment Demo**") # Model status indicator if st.session_state.edge_ai.model_loaded: st.success(f"✅ Real Model Loaded: {st.session_state.edge_ai.model_version}") else: st.warning("⚠️ Using Mock Inference (Model not found)") st.markdown("---") # Hardware status bar col1, col2, col3, col4, col5 = st.columns(5) with col1: st.metric("🌡️ Device Temp", f"{st.session_state.edge_ai.device_temp:.1f}°C") with col2: st.metric("💻 CPU Usage", f"{st.session_state.edge_ai.cpu_usage:.1f}%") with col3: st.metric("⚡ Servo Angle", f"{st.session_state.servo.current_angle}°") with col4: st.metric("📷 Frames", st.session_state.camera.frame_count) with col5: st.metric("🔌 Servo Current", f"{st.session_state.servo.current_ma} mA") st.markdown("---") # Tabs tab1, tab2, tab3, tab4, tab5 = st.tabs([ "🔄 Live Pipeline", "📊 Dashboard", "📋 Event Logs", "🔧 Hardware Diagnostics", "📡 Sensor Details" ]) # ==================== TAB 1: LIVE PIPELINE ==================== with tab1: col1, col2 = st.columns([1, 2]) with col1: st.subheader("⚙️ Sensor Configuration") st.markdown("##### Ultrasonic Sensor (HC-SR04)") distance = st.slider("Distance (cm)", 2.0, 60.0, 25.0, 0.5) st.markdown("##### Capacitive Moisture Sensor") moisture = st.slider("Moisture (%)", 0.0, 100.0, 30.0, 1.0) / 100.0 st.markdown("##### Load Cell (HX711)") weight = st.slider("Weight (g)", 0.0, 1000.0, 150.0, 1.0) st.session_state.sensor_hub.set_baseline(distance, moisture, weight) st.markdown("---") # Image upload st.subheader("📷 Camera Input") uploaded_file = st.file_uploader( "Upload waste image (optional)", type=['jpg', 'png', 'jpeg'] ) if uploaded_file: image = Image.open(uploaded_file) st.image(image, width=250, caption="Uploaded Image") else: st.info("No image uploaded. Will use synthetic waste image.") st.markdown("---") # Execute button if st.button("🚀 Run Complete Pipeline", type="primary", use_container_width=True): st.session_state.execution_log = [] with st.spinner("Processing..."): pipeline_start = time.time() # Step 1: Camera capture st.session_state.execution_log.append("📷 Capturing image from camera...") try: if uploaded_file: image = Image.open(uploaded_file) capture_time = 0 else: image, capture_time = st.session_state.camera.capture_image() st.session_state.execution_log.append(f" ✓ Image captured ({capture_time:.2f}ms)") except Exception as e: st.session_state.execution_log.append(f" ⚠️ Image capture failed: {e}") st.session_state.execution_log.append(" 🔄 Generating synthetic image...") image = st.session_state.camera._generate_synthetic_waste() capture_time = 0 # Step 2: Sensor reading st.session_state.execution_log.append("📡 Reading sensor suite...") sensor_data = st.session_state.sensor_hub.read_all_sensors() st.session_state.execution_log.append(f" ✓ Sensors read ({sensor_data.total_read_time_ms:.2f}ms)") st.session_state.execution_log.append(f" - Distance: {sensor_data.distance_cm} cm") st.session_state.execution_log.append(f" - Moisture: {sensor_data.moisture_percent:.1f}%") st.session_state.execution_log.append(f" - Weight: {sensor_data.weight_g} g") # Log sensor telemetry st.session_state.logger.log_telemetry("sensors", { 'distance': asdict(sensor_data.distance_raw), 'moisture': asdict(sensor_data.moisture_raw), 'weight': asdict(sensor_data.weight_raw), 'temperature': sensor_data.temperature }) # Step 3: AI Inference st.session_state.execution_log.append("🤖 Running edge AI inference...") inference = st.session_state.edge_ai.infer(image, sensor_data) st.session_state.execution_log.append(f" ✓ Inference complete ({inference.inference_time_ms:.2f}ms)") st.session_state.execution_log.append(f" - Preprocessing: {inference.preprocessing_time_ms:.2f}ms") st.session_state.execution_log.append(f" - Model inference: {inference.inference_time_ms:.2f}ms") st.session_state.execution_log.append(f" - Postprocessing: {inference.postprocessing_time_ms:.2f}ms") st.session_state.execution_log.append(f" - Classification: {inference.waste_class} ({inference.confidence*100:.2f}%)") # Log AI telemetry st.session_state.logger.log_telemetry("edge_ai", asdict(inference)) # Step 4: Motor control st.session_state.execution_log.append(f"⚙️ Actuating servo to {inference.waste_class} bin...") motor = st.session_state.servo.move_to_bin(inference.waste_class) st.session_state.execution_log.append(f" ✓ Servo moved ({motor.duration_ms:.2f}ms)") st.session_state.execution_log.append(f" - Start angle: {motor.start_angle}°") st.session_state.execution_log.append(f" - Target angle: {motor.target_angle}°") st.session_state.execution_log.append(f" - Power consumption: {motor.power_consumption_mw:.2f} mW") # Log motor telemetry st.session_state.logger.log_telemetry("servo_motor", asdict(motor)) # Step 5: Log event st.session_state.execution_log.append("💾 Logging event to database...") image_name = uploaded_file.name if uploaded_file else "synthetic.jpg" st.session_state.logger.log_event(sensor_data, inference, motor, image_name) # Log hardware diagnostics st.session_state.logger.log_hardware_diagnostics( "ultrasonic", "operational", 3.3, 15, sensor_data.temperature, st.session_state.sensor_hub.ultrasonic.error_count ) st.session_state.logger.log_hardware_diagnostics( "servo_motor", "operational", 5.0, st.session_state.servo.current_ma, 35.0, 0 ) pipeline_time = (time.time() - pipeline_start) * 1000 st.session_state.execution_log.append(f"✅ Pipeline completed! Total time: {pipeline_time:.2f}ms") # Store results st.session_state.last_result = { 'sensor': sensor_data, 'inference': inference, 'motor': motor, 'image': image, 'pipeline_time': pipeline_time } st.success("✅ Segregation completed successfully!") st.balloons() with col2: st.subheader("🔄 Pipeline Execution Log") # Execution log if st.session_state.execution_log: log_container = st.container() with log_container: for log_entry in st.session_state.execution_log: if "✓" in log_entry or "✅" in log_entry: st.success(log_entry) elif "📡" in log_entry or "🤖" in log_entry or "⚙️" in log_entry or "📷" in log_entry: st.info(log_entry) else: st.write(log_entry) st.markdown("---") # Results if 'last_result' in st.session_state: res = st.session_state.last_result st.subheader("📊 Pipeline Results") # Display captured image col_img, col_metrics = st.columns([1, 2]) with col_img: st.image(res['image'], caption="Processed Image", use_container_width=True) with col_metrics: st.metric("🗑️ Classification", res['inference'].waste_class.upper()) st.metric("📊 Confidence", f"{res['inference'].confidence*100:.2f}%") st.metric("🤖 Inference Time", f"{res['inference'].inference_time_ms:.2f} ms") st.metric("⏱️ Total Pipeline Time", f"{res['pipeline_time']:.2f} ms") st.markdown("---") st.metric("⚙️ Servo Angle", f"{res['motor'].target_angle}°") st.metric("⚡ Power Used", f"{res['motor'].power_consumption_mw:.2f} mW") st.markdown("---") # Class probabilities st.markdown("#### 📈 Classification Probabilities") for cls, prob in res['inference'].probabilities.items(): st.progress(prob, text=f"{cls.capitalize()}: {prob*100:.2f}%") st.markdown("---") # Detailed telemetry st.subheader("📡 Detailed Component Telemetry") tel_col1, tel_col2, tel_col3 = st.columns(3) with tel_col1: with st.expander("📏 Sensor Readings", expanded=False): sensor_data = { 'distance_cm': res['sensor'].distance_cm, 'distance_raw_adc': res['sensor'].distance_raw.raw_value, 'moisture_percent': res['sensor'].moisture_percent, 'moisture_raw_adc': res['sensor'].moisture_raw.raw_value, 'weight_g': res['sensor'].weight_g, 'weight_raw_adc': res['sensor'].weight_raw.raw_value, 'temperature_c': res['sensor'].temperature, 'total_read_time_ms': res['sensor'].total_read_time_ms } st.json(sensor_data) with tel_col2: with st.expander("🤖 AI Inference", expanded=False): inference_data = { 'waste_class': res['inference'].waste_class, 'confidence': res['inference'].confidence, 'probabilities': res['inference'].probabilities, 'preprocessing_ms': res['inference'].preprocessing_time_ms, 'inference_ms': res['inference'].inference_time_ms, 'postprocessing_ms': res['inference'].postprocessing_time_ms, 'model_version': res['inference'].model_version, 'device_temp_c': res['inference'].device_temperature, 'cpu_usage_percent': res['inference'].cpu_usage_percent } st.json(inference_data) with tel_col3: with st.expander("⚙️ Motor Control", expanded=False): motor_data = { 'start_angle': res['motor'].start_angle, 'target_angle': res['motor'].target_angle, 'current_angle': res['motor'].current_angle, 'duration_ms': res['motor'].duration_ms, 'power_consumption_mw': res['motor'].power_consumption_mw, 'torque_applied_kgcm': res['motor'].torque_applied, 'status': res['motor'].status, 'movement_steps': len(res['motor'].movement_profile) } st.json(motor_data) else: st.info("👈 Configure sensors and click 'Run Complete Pipeline' to start") st.markdown("### 🗺️ System Architecture") st.markdown(""" **Hardware Components:** - 📷 **Camera**: Raspberry Pi Camera Module v2 (1920x1080) - 📏 **Ultrasonic**: HC-SR04 (2-400cm range) - 💧 **Moisture**: Capacitive sensor (12-bit ADC) - ⚖️ **Weight**: HX711 Load Cell (5kg capacity) - ⚙️ **Servo**: MG996R (180° rotation, 11 kg-cm torque) - 💻 **Edge Device**: Raspberry Pi 4 (4GB RAM, Quad-core) **Pipeline Flow:** 1. Camera captures waste image 2. Sensors read physical properties 3. Edge AI processes multi-modal data 4. Servo routes waste to correct bin 5. Event logged to database """) # ==================== TAB 2: DASHBOARD ==================== with tab2: st.subheader("📊 System Statistics") stats = st.session_state.logger.get_statistics() col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Total Events", stats['total_events']) with col2: st.metric("Avg Confidence", f"{stats['average_confidence']*100:.2f}%") with col3: st.metric("Avg Pipeline Time", f"{stats.get('average_pipeline_time_ms', 0):.2f} ms") with col4: st.metric("Total Power Used", f"{stats.get('total_power_consumption_mwh', 0):.4f} mWh") st.markdown("---") # Class distribution chart col_chart1, col_chart2 = st.columns(2) with col_chart1: st.subheader("📈 Waste Class Distribution") if stats['class_distribution']: import pandas as pd df = pd.DataFrame( list(stats['class_distribution'].items()), columns=['Class', 'Count'] ) st.bar_chart(df.set_index('Class')) else: st.info("No data yet. Run some segregations first!") with col_chart2: st.subheader("⚡ System Performance") if stats['total_events'] > 0: st.metric("Average Inference", f"{stats.get('average_pipeline_time_ms', 0):.2f} ms") st.metric("System Efficiency", f"{stats['average_confidence']*100:.1f}%") st.metric("Power Efficiency", f"{stats.get('total_power_consumption_mwh', 0)*1000:.2f} mW") else: st.info("No performance data yet.") # ==================== TAB 3: EVENT LOGS ==================== with tab3: st.subheader("📋 Recent Events") num_events = st.slider("Number of events to display", 5, 50, 10) events = st.session_state.logger.get_recent_events(num_events) if events: import pandas as pd df = pd.DataFrame(events) display_cols = [ 'timestamp', 'waste_class', 'confidence', 'distance_cm', 'moisture_percent', 'weight_g', 'servo_angle', 'inference_time_ms', 'total_pipeline_time_ms', 'power_consumption_mw', 'status' ] available_cols = [col for col in display_cols if col in df.columns] st.dataframe( df[available_cols], use_container_width=True, hide_index=True ) # Download button csv = df.to_csv(index=False) st.download_button( "📥 Download CSV", csv, "events.csv", "text/csv", use_container_width=True ) else: st.info("No events logged yet.") # ==================== TAB 4: HARDWARE DIAGNOSTICS ==================== with tab4: st.subheader("🔧 Hardware Diagnostics") diagnostics = st.session_state.logger.get_hardware_diagnostics(20) if diagnostics: import pandas as pd df = pd.DataFrame(diagnostics) st.dataframe( df[['timestamp', 'component', 'status', 'voltage', 'current_ma', 'temperature_c', 'error_count']], use_container_width=True, hide_index=True ) st.markdown("---") # Component status st.subheader("📊 Component Status") col1, col2, col3 = st.columns(3) with col1: st.markdown("##### Ultrasonic Sensor") st.write(f"Status: {st.session_state.sensor_hub.ultrasonic.status.value}") st.write(f"Errors: {st.session_state.sensor_hub.ultrasonic.error_count}") st.write(f"Last Reading: {st.session_state.sensor_hub.ultrasonic.last_reading} cm") with col2: st.markdown("##### Servo Motor") motor_diag = st.session_state.servo.get_diagnostics() st.write(f"Angle: {motor_diag['current_angle']}°") st.write(f"Current: {motor_diag['current_ma']} mA") st.write(f"Temperature: {motor_diag['temperature_c']:.1f}°C") with col3: st.markdown("##### Edge Device") st.write(f"CPU: {st.session_state.edge_ai.cpu_usage:.1f}%") st.write(f"Temperature: {st.session_state.edge_ai.device_temp:.1f}°C") st.write(f"Model: {st.session_state.edge_ai.model_version}") else: st.info("No diagnostics data yet.") # ==================== TAB 5: SENSOR DETAILS ==================== with tab5: st.subheader("📡 Detailed Sensor Information") col1, col2 = st.columns(2) with col1: st.markdown("### Ultrasonic Sensor (HC-SR04)") st.markdown(f""" - **Range**: {HardwareConfig.ULTRASONIC_MIN_DISTANCE} - {HardwareConfig.ULTRASONIC_MAX_DISTANCE} cm - **Accuracy**: ±{HardwareConfig.ULTRASONIC_ACCURACY} cm - **Read Time**: {HardwareConfig.ULTRASONIC_READ_TIME*1000:.1f} ms - **Operating Voltage**: 5V DC - **Current**: 15 mA - **Frequency**: 40 kHz """) st.markdown("### Capacitive Moisture Sensor") st.markdown(f""" - **Voltage Range**: {HardwareConfig.MOISTURE_MIN_VOLTAGE} - {HardwareConfig.MOISTURE_MAX_VOLTAGE} V - **ADC Resolution**: {HardwareConfig.MOISTURE_ADC_RESOLUTION} bits ({2**HardwareConfig.MOISTURE_ADC_RESOLUTION} levels) - **Read Time**: {HardwareConfig.MOISTURE_READ_TIME*1000:.1f} ms - **Operating Voltage**: 3.3V - 5.5V - **Current**: 5 mA """) with col2: st.markdown("### Load Cell (HX711)") st.markdown(f""" - **Capacity**: {HardwareConfig.WEIGHT_MAX_CAPACITY} g - **Resolution**: {HardwareConfig.WEIGHT_RESOLUTION} g - **Stabilization Time**: {HardwareConfig.WEIGHT_STABILIZATION_TIME*1000:.0f} ms - **Read Time**: {HardwareConfig.WEIGHT_READ_TIME*1000:.0f} ms - **ADC**: 24-bit - **Sample Rate**: 10/80 Hz """) st.markdown("### Servo Motor (MG996R)") st.markdown(f""" - **Rotation**: {HardwareConfig.SERVO_MIN_ANGLE}° - {HardwareConfig.SERVO_MAX_ANGLE}° - **Speed**: 60°/0.17s (at 4.8V) - **Torque**: {HardwareConfig.SERVO_TORQUE} kg-cm - **Operating Voltage**: 4.8V - 7.2V - **Current (Idle)**: {HardwareConfig.SERVO_CURRENT_IDLE} mA - **Current (Moving)**: {HardwareConfig.SERVO_CURRENT_MOVING} mA """) st.markdown("---") st.markdown("### 📷 Camera Module (Raspberry Pi Camera v2)") st.markdown(f""" - **Sensor**: Sony IMX219 8MP - **Resolution**: {HardwareConfig.CAMERA_RESOLUTION[0]}x{HardwareConfig.CAMERA_RESOLUTION[1]} - **Capture Time**: {HardwareConfig.CAMERA_CAPTURE_TIME*1000:.0f} ms - **Warmup Time**: {HardwareConfig.CAMERA_WARMUP_TIME:.1f} s - **Interface**: CSI (Camera Serial Interface) - **Frame Rate**: 30 fps (1080p) """) st.markdown("### 💻 Edge Device (Raspberry Pi 4)") st.markdown(f""" - **CPU**: Quad-core Cortex-A72 @ 1.5GHz - **RAM**: {HardwareConfig.RAM_MB} MB - **Cores**: {HardwareConfig.CPU_CORES} - **Inference Overhead**: {HardwareConfig.INFERENCE_OVERHEAD*1000:.0f} ms - **Operating System**: Raspberry Pi OS (Linux) - **AI Framework**: PyTorch / ONNX Runtime """) # Footer st.markdown("---") st.caption("Smart Waste Segregation System | LDRP-ITR, Gandhinagar | IDEA-ONE Hackathon 2025") st.caption("Hardware-Realistic Simulation | Edge AI Deployment | Multi-Modal Sensor Fusion") if __name__ == "__main__": main()