import firebase_admin from firebase_admin import credentials, firestore from typing import List, Dict, Optional import pandas as pd import streamlit as st import json from src.utils.anomaly import filter_anomalies # Import the device lookup function from ..utils.device_lookup import get_device_name def initialize_firebase(): """Initialize Firebase with credentials""" try: firebase_admin.get_app() except ValueError: # Get the entire Firebase credentials JSON from secrets firebase_creds = json.loads(st.secrets["FIREBASE_CREDENTIALS"]) cred = credentials.Certificate(firebase_creds) firebase_admin.initialize_app(cred) return firestore.client() db = initialize_firebase() def normalize_device_id(device_info: dict) -> str: """Normalize device identifier for aggregation""" emulator = "/Emulator" if device_info.get("isEmulator", False) else "" # iOS if device_info.get("systemName", "").lower() == "ios": device_id = device_info.get("deviceId", "Unknown") device_name = get_device_name("Apple", device_id, device_id) return f"iOS/{device_name}{emulator}" # Android memory_tier = f"{device_info.get('totalMemory', 0) // (1024**3)}GB" model = device_info.get("model", "Unknown") manufacturer = device_info.get("brand", "Unknown") name = get_device_name(manufacturer, model, model) return f"{manufacturer}/{name}/{memory_tier}{emulator}" def format_params_in_b(params: int) -> float: """Format number of parameters in billions""" b_value = params / 1e9 if b_value >= 10: return round(b_value, 1) elif b_value >= 1: return round(b_value, 2) else: return round(b_value, 3) def format_leaderboard_data(submissions: List[dict]) -> pd.DataFrame: """Format submissions for leaderboard display""" formatted_data = [] for sub in submissions: try: benchmark_result = sub.get("benchmarkResult", {}) device_info = sub.get("deviceInfo", {}) # Skip if missing required data if not benchmark_result or not device_info: continue # Skip if missing initSettings if "initSettings" not in benchmark_result: continue # Skip emulators if device_info.get("isEmulator", False): continue # Skip if benchmark failed (zero or missing performance metrics) pp_avg = benchmark_result.get("ppAvg", 0) tg_avg = benchmark_result.get("tgAvg", 0) if pp_avg <= 0.01 or tg_avg <= 0.01: continue # Get device ID for iOS devices device_id = device_info.get("deviceId", "Unknown") platform = device_info.get("systemName", "Unknown").lower() if platform == "ios": # For iOS, use the device lookup device_name = get_device_name("Apple", device_id, device_id) elif platform == "android": # For Android, use the GitHub repo lookup manufacturer = device_info.get("brand", "Unknown") model = device_info.get("model", "Unknown") device_name = get_device_name(manufacturer, model, model) else: # For other platforms, use the model as is device_name = device_info.get("model", "Unknown") formatted_data.append( { "Submission ID": benchmark_result.get("uuid", "Unknown"), "Device": device_name, # Use normalized device name "Device ID": device_id, "Platform": device_info.get("systemName", "Unknown"), "Benchmark": f"{benchmark_result.get('config', {}).get('label', 'Unknown')} (pp: {benchmark_result.get('config', {}).get('pp', 'N/A')}, tg: {benchmark_result.get('config', {}).get('tg', 'N/A')})", "PP Config": benchmark_result.get("config", {}).get("pp", "N/A"), "TG Config": benchmark_result.get("config", {}).get("tg", "N/A"), "Model": benchmark_result.get("modelName", "Unknown"), "Model Size": format_params_in_b( benchmark_result.get("modelNParams", 0) ), "Model File Size": benchmark_result.get("modelSize", 0), "Prompt Processing": round(pp_avg, 2), "Token Generation": round(tg_avg, 2), "Memory Usage (%)": benchmark_result.get("peakMemoryUsage", {}).get( "percentage" ), "Peak Memory (GB)": ( round( benchmark_result.get("peakMemoryUsage", {}).get("used", 0) / (1024**3), 2, ) if benchmark_result.get("peakMemoryUsage", {}).get("used") else None ), "Total Memory (GB)": round( device_info.get("totalMemory", 0) / (1024**3), 2 ), "CPU Cores": device_info.get("cpuDetails", {}).get( "cores", "Unknown" ), "Normalized Device ID": normalize_device_id(device_info), "Timestamp": benchmark_result.get("timestamp", "Unknown"), "Model ID": benchmark_result.get("modelId", "Unknown"), "OID": benchmark_result.get("oid"), "n_threads": benchmark_result.get("initSettings", {}).get( "n_threads", -1 ), "n_gpu_layers": benchmark_result.get("initSettings", {}).get( "n_gpu_layers", 0 ), "flash_attn": benchmark_result.get("initSettings", {}).get( "flash_attn", False ), "cache_type_k": benchmark_result.get("initSettings", {}).get( "cache_type_k", "f16" ), "cache_type_v": benchmark_result.get("initSettings", {}).get( "cache_type_v", "f16" ), "n_context": benchmark_result.get("initSettings", {}).get( "n_context", -1 ), "n_batch": benchmark_result.get("initSettings", {}).get( "n_batch", -1 ), "n_ubatch": benchmark_result.get("initSettings", {}).get( "n_ubatch", -1 ), "Version": device_info.get("version", "Unknown"), } ) except Exception as e: st.warning(f"Error processing submission: {str(e)}") continue formatted_df = pd.DataFrame(formatted_data) filtered_df, anomalies = filter_anomalies( formatted_df, z_threshold=9.0, min_samples=5 ) print( "Anomalies: ", anomalies[["Device ID", "Model", "Metric", "Value", "Mean", "Std"]], ) return filtered_df async def fetch_leaderboard_data( model_name: Optional[str] = None, benchmark_label: Optional[str] = None ) -> pd.DataFrame: """Fetch and process leaderboard data from Firestore""" try: # Navigate to the correct collection path: benchmarks/v1/submissions submissions_ref = ( db.collection("benchmarks").document("v1").collection("submissions") ) # Get all documents docs = submissions_ref.stream() all_docs = list(docs) if len(all_docs) == 0: return pd.DataFrame() # Process documents and filter in memory submissions = [] for doc in all_docs: data = doc.to_dict() if not data or "benchmarkResult" not in data: continue benchmark_result = data["benchmarkResult"] # Apply filters if ( model_name and model_name != "All" and benchmark_result.get("modelName") != model_name ): continue if ( benchmark_label and benchmark_label != "All" and benchmark_result.get("config", {}).get("label") != benchmark_label ): continue submissions.append(data) return format_leaderboard_data(submissions) except Exception as e: st.error(f"Error fetching data from Firestore: {str(e)}") return pd.DataFrame()