""" Core module for data visualization components. """ import streamlit as st import plotly.express as px import pandas as pd from typing import Optional, Dict, List, Set import plotly.graph_objects as go from ..core.scoring import get_quantization_tier from ..core.glicko2_ranking import analyze_glicko2_rankings def clean_device_id(device_id: str) -> str: """Extract clean device name from normalized ID by removing platform prefix""" if device_id.startswith("iOS/"): return device_id[4:] # Remove "iOS/" return device_id def get_quant_name(factor: float) -> str: """Get human-readable name for quantization factor""" if pd.isna(factor): return "Unknown" if factor >= 1.0: return "No Quantization (F16/F32)" quant_map = { 0.8: "[i]Q8_x", 0.6: "[i]Q6_x", 0.5: "[i]Q5_x", 0.4: "[i]Q4_x", 0.3: "[i]Q3_x", 0.2: "[i]Q2_x", 0.1: "[i]Q1_x", } return quant_map.get(factor, f"Q{int(factor*10)}_x") def create_performance_plot( df: pd.DataFrame, metric: str, title: str, hover_data: List[str] = None ): """Create a performance comparison plot""" if df.empty: return None if hover_data is None: hover_data = [ "CPU Cores", "Peak Memory (GB)", "performance_score", "quant_factor", ] fig = px.bar( df, x="Device", y=metric, color="Platform", title=title, template="plotly_white", barmode="group", hover_data=hover_data, ) fig.update_layout( xaxis_title="Device", yaxis_title="Token/sec" if "Token" in metric else metric, legend_title="Platform", plot_bgcolor="white", height=400, ) return fig def filter_dataframe(df: pd.DataFrame, filters: Dict) -> pd.DataFrame: """Apply all filters to the dataframe""" if df.empty: return df filtered_df = df.copy() # Basic filters if filters["model"] != "All": filtered_df = filtered_df[filtered_df["Model ID"] == filters["model"]] if filters["platform"] != "All": filtered_df = filtered_df[filtered_df["Platform"] == filters["platform"]] if filters["device"] != "All": filtered_df = filtered_df[filtered_df["Device"] == filters["device"]] # Flash Attention filter if filters["flash_attn"] != "All": filtered_df = filtered_df[filtered_df["flash_attn"] == filters["flash_attn"]] # Cache Type filters if filters["cache_type_k"] != "All": filtered_df = filtered_df[ filtered_df["cache_type_k"] == filters["cache_type_k"] ] if filters["cache_type_v"] != "All": filtered_df = filtered_df[ filtered_df["cache_type_v"] == filters["cache_type_v"] ] # Range filters pp_min, pp_max = filters["pp_range"] if pp_min is not None and pp_max is not None: pp_values = filtered_df["PP Config"] filtered_df = filtered_df[(pp_values >= pp_min) & (pp_values <= pp_max)] tg_min, tg_max = filters["tg_range"] if tg_min is not None and tg_max is not None: tg_values = filtered_df["TG Config"] filtered_df = filtered_df[(tg_values >= tg_min) & (tg_values <= tg_max)] n_threads_min, n_threads_max = filters["n_threads"] if n_threads_min is not None and n_threads_max is not None: n_threads = filtered_df["n_threads"] filtered_df = filtered_df[ (n_threads >= n_threads_min) & (n_threads <= n_threads_max) ] n_gpu_layers_min, n_gpu_layers_max = filters["n_gpu_layers"] if n_gpu_layers_min is not None and n_gpu_layers_max is not None: n_gpu_layers = filtered_df["n_gpu_layers"] filtered_df = filtered_df[ (n_gpu_layers >= n_gpu_layers_min) & (n_gpu_layers <= n_gpu_layers_max) ] # Version filter if filters.get("Version") != "All" and filters.get("Version"): filtered_df = filtered_df[filtered_df["Version"] == filters["Version"]] return filtered_df def create_model_size_performance_plot( df: pd.DataFrame, device_id: str, quant_filter: str, title: str ): """Create a plot showing model size vs performance metrics for a specific device""" if df.empty: return None # Filter for the selected device device_df = df[df["Normalized Device ID"] == device_id].copy() if device_df.empty: return None # Filter by quantization if specified if quant_filter != "All": device_df = device_df[ device_df["Model ID"].apply( lambda x: get_quantization_tier(x) == float(quant_filter) ) ] if device_df.empty: return None # Create a new figure with secondary y-axis fig = go.Figure() # Define shapes for different quantization levels quant_shapes = { 1.0: "circle", # F16/F32 0.8: "square", # Q8 0.6: "diamond", # Q6 0.5: "triangle-up", # Q5 0.4: "triangle-down", # Q4 0.3: "star", # Q3 0.2: "pentagon", # Q2 0.1: "hexagon", # Q1 } # Add Token Generation data (left y-axis) for quant in sorted(device_df["quant_factor"].unique()): quant_df = device_df[device_df["quant_factor"] == quant] if quant_df.empty: continue quant_name = get_quant_name(quant) fig.add_trace( go.Scatter( x=quant_df["Model Size"], y=quant_df["Token Generation"], name=f"{quant_name}", mode="markers", marker=dict( color="#2ecc71", symbol=quant_shapes.get(quant, "circle"), size=10, ), yaxis="y", legendgroup="quant", showlegend=True, ) ) # Add Prompt Processing data (right y-axis) for quant in sorted(device_df["quant_factor"].unique()): quant_df = device_df[device_df["quant_factor"] == quant] if quant_df.empty: continue fig.add_trace( go.Scatter( x=quant_df["Model Size"], y=quant_df["Prompt Processing"], name=f"{quant_name}", mode="markers", marker=dict( color="#e74c3c", symbol=quant_shapes.get(quant, "circle"), size=10, ), yaxis="y2", legendgroup="quant", showlegend=False, # Don't show duplicate quantization entries in legend ) ) # Add trend lines if enough points if len(device_df) > 2: # TG trend line tg_trend = px.scatter( device_df, x="Model Size", y="Token Generation", trendline="lowess" ).data[1] tg_trend.update( line=dict(color="#2ecc71", dash="solid"), name="Token Generation", showlegend=False, # Hide from legend yaxis="y", legendgroup="metric", ) fig.add_trace(tg_trend) # PP trend line pp_trend = px.scatter( device_df, x="Model Size", y="Prompt Processing", trendline="lowess" ).data[1] pp_trend.update( line=dict(color="#e74c3c", dash="solid"), name="Prompt Processing", showlegend=False, # Hide from legend yaxis="y2", legendgroup="metric", ) fig.add_trace(pp_trend) # Update layout with two y-axes fig.update_layout( title=title, xaxis=dict( title="Model Size (B)", gridcolor="lightgrey", range=[0, max(device_df["Model Size"]) * 1.05], ), yaxis=dict( title="Token Generation (t/s)", titlefont=dict(color="#2ecc71"), tickfont=dict(color="#2ecc71"), gridcolor="lightgrey", side="left", range=[0, max(device_df["Token Generation"]) * 1.05], ), yaxis2=dict( title="Prompt Processing (t/s)", titlefont=dict(color="#e74c3c"), tickfont=dict(color="#e74c3c"), anchor="x", overlaying="y", side="right", range=[0, max(device_df["Prompt Processing"]) * 1.05], ), height=400, showlegend=True, plot_bgcolor="white", legend=dict( yanchor="top", y=0.99, xanchor="right", x=0.99, bgcolor="rgba(255, 255, 255, 0.8)", bordercolor="lightgrey", borderwidth=1, groupclick="togglegroup", # Toggle all traces in the same group title="Quantization", # Add legend title ), ) return fig def render_model_size_performance(df: pd.DataFrame, filters: Dict): """Render the model size vs performance section independently""" if df.empty: st.warning("No data available for plotting.") return # Apply all filters from the table size_perf_df = filter_dataframe(df, filters) if size_perf_df.empty: st.warning("No data matches the selected filters.") return # Get the device with highest performance score top_device_id = size_perf_df.loc[size_perf_df["performance_score"].idxmax()][ "Normalized Device ID" ] device_ids = sorted(size_perf_df["Normalized Device ID"].unique()) default_index = device_ids.index(top_device_id) # Create mapping of normalized IDs to display names device_display_names = { device_id: clean_device_id(device_id) for device_id in device_ids } # Create columns for device and quantization selectors col1, col2 = st.columns([2, 1]) with col1: # Device selector selected_device_id = st.selectbox( "Select Device", options=device_ids, format_func=lambda x: device_display_names[x], help="Select a device to view its performance across different model sizes", key="size_perf_device_selector", placeholder="Search for a device...", index=default_index, ) with col2: # Quantization filter quant_options = ["All"] + [ str(q) for q in sorted(size_perf_df["quant_factor"].unique()) ] quant_filter = st.selectbox( "Filter by Quantization", options=quant_options, format_func=lambda x: ( "All Quantizations" if x == "All" else get_quant_name(float(x)) ), help="Filter data points by quantization level", key="size_perf_quant_selector", ) # Create and display the model size vs performance plot size_perf_fig = create_model_size_performance_plot( size_perf_df, selected_device_id, quant_filter, f"Model Size vs Performance Metrics for {device_display_names[selected_device_id]}", ) if size_perf_fig: st.plotly_chart(size_perf_fig, use_container_width=True) else: st.warning("No data available for the selected device and quantization level.") def render_performance_plots(df: pd.DataFrame, filters: Dict): """Render performance comparison plots""" if df.empty: st.warning("No data available for plotting.") return # Apply filters filtered_df = filter_dataframe(df, filters) if filtered_df.empty: st.warning("No data matches the selected filters for plotting.") return # Add Model Size vs Performance section first st.markdown("### 📊 Model Size vs Performance") render_model_size_performance(df, filters) def render_leaderboard_table(df: pd.DataFrame, filters: Dict): """Render the leaderboard table with grouped and formatted data""" if df.empty: st.warning("No data available for the selected filters.") return # Apply filters filtered_df = filter_dataframe(df, filters) if filtered_df.empty: st.warning("No data matches the selected filters.") return # Define the preferred column order (grouped logically) column_order = [ # Performance Score "performance_score", "quant_factor", # Device Info "Device", "Platform", "CPU Cores", "Total Memory (GB)", "Peak Memory (GB)", "Memory Usage (%)", # Benchmark Results "PP Config", "PP Avg (t/s)", "PP Std (t/s)", "TG Config", "TG Avg (t/s)", "TG Std (t/s)", # Model Config "Model ID", "Model Size", "n_threads", "flash_attn", "cache_type_k", "cache_type_v", "n_context", "n_batch", "n_ubatch", "Version", ] # Group by selected columns grouping_cols = filters["grouping"] if not grouping_cols: grouping_cols = ["Model ID", "Device", "Platform"] # Default grouping # Create aggregations (excluding grouping columns) agg_dict = { col: agg for col, agg in { "Prompt Processing": ["mean", "std"], "Token Generation": ["mean", "std"], "Peak Memory (GB)": "mean", "Total Memory (GB)": "first", "CPU Cores": "first", "Model Size": "first", "Version": lambda x: ", ".join(sorted(set(x))), "n_gpu_layers": lambda x: ", ".join(sorted(set(str(x)))), "performance_score": "mean", "quant_factor": "first", }.items() if col not in grouping_cols } # Group and aggregate grouped_df = filtered_df.groupby(grouping_cols).agg(agg_dict).reset_index() # Flatten column names grouped_df.columns = [ col[0] if col[1] == "" else f"{col[0]} ({col[1]})" for col in grouped_df.columns ] # Rename columns for display column_mapping = { "Prompt Processing (mean)": "PP Avg (t/s)", "Prompt Processing (std)": "PP Std (t/s)", "Token Generation (mean)": "TG Avg (t/s)", "Token Generation (std)": "TG Std (t/s)", "Memory Usage (%) (mean)": "Memory Usage (%)", "Peak Memory (GB) (mean)": "Peak Memory (GB)", "PP Config (first)": "PP Config", "TG Config (first)": "TG Config", "Model Size (first)": "Model Size", "CPU Cores (first)": "CPU Cores", "Total Memory (GB) (first)": "Total Memory (GB)", "n_threads (first)": "n_threads", "flash_attn (first)": "flash_attn", "cache_type_k (first)": "cache_type_k", "cache_type_v (first)": "cache_type_v", "n_context (first)": "n_context", "n_batch (first)": "n_batch", "n_ubatch (first)": "n_ubatch", "Version ()": "Version", "performance_score (mean)": "Performance Score", "quant_factor (first)": "Quant Factor", } grouped_df = grouped_df.rename(columns=column_mapping) # Sort by performance score grouped_df = grouped_df.sort_values("Performance Score", ascending=False) # Filter visible columns visible_cols = filters["visible_columns"] if visible_cols: # Map the user-friendly names to actual column names column_name_mapping = { "Device": "Device", "Platform": "Platform", "CPU Cores": "CPU Cores", "Total Memory (GB)": "Total Memory (GB)", "Peak Memory (GB)": "Peak Memory (GB)", "Memory Usage (%)": "Memory Usage (%)", "PP Config": "PP Config", "TG Config": "TG Config", "Prompt Processing (mean)": "PP Avg (t/s)", "Token Generation (mean)": "TG Avg (t/s)", "Prompt Processing (std)": "PP Std (t/s)", "Token Generation (std)": "TG Std (t/s)", "Model": "Model ID", "Model Size": "Model Size", "Model ID": "Model ID", "n_threads": "n_threads", "flash_attn": "flash_attn", "cache_type_k": "cache_type_k", "cache_type_v": "cache_type_v", "n_context": "n_context", "n_batch": "n_batch", "n_ubatch": "n_ubatch", "Version": "Version", "Performance Score": "Performance Score", "Quant Factor": "Quant Factor", } # Convert visible columns and grouping columns to their mapped names mapped_visible = {column_name_mapping.get(col, col) for col in visible_cols} mapped_grouping = { column_name_mapping.get(col, col) for col in filters["grouping"] } # Always include performance score and quant factor mapped_visible.add("Performance Score") mapped_visible.add("Quant Factor") # Combine both sets to get unique columns all_cols = mapped_visible | mapped_grouping # Create final display columns list display_cols = [] # Get all available columns we want to display available_cols = set(all_cols) # Add columns in the predefined order for col in column_order: if col in available_cols: display_cols.append(col) # Add any remaining columns that weren't in our predefined order remaining_cols = sorted(list(available_cols - set(display_cols))) display_cols.extend(remaining_cols) else: # Default columns if none selected display_cols = ["Performance Score", "Quant Factor"] + column_order[:8] # Display the filtered and grouped table st.markdown("#### 📊 Benchmark Results") st.dataframe( grouped_df[display_cols], use_container_width=True, height=min( 400, (len(grouped_df) + 1) * 35 + 40 ), # Dynamic height based on content hide_index=False, column_config={ "Rank": st.column_config.NumberColumn( "Rank", help="Device ranking based on performance score", ), "Device": st.column_config.TextColumn( "Device", help="Device brand and model", ), "Best Score": st.column_config.NumberColumn( "Score", help="Overall performance score (0-100)", format="%.2f" ), "Best TG Speed": st.column_config.NumberColumn( "Best TG Speed (t/s)", help="Best token generation speed", format="%.2f", ), "Best PP Speed": st.column_config.NumberColumn( "Best PP Speed (t/s)", help="Best prompt processing speed", format="%.2f", ), }, ) def render_device_rankings(df: pd.DataFrame): """Render device rankings using Glicko-2 algorithm.""" if df.empty: st.warning("No data available for device rankings.") return # Calculate Glicko-2 rankings automatically with st.spinner("Calculating Glicko-2 rankings..."): try: g2_all, g2_confident = analyze_glicko2_rankings( df, min_matches=5, # Default minimum matches min_gpu_layers=20, # Default minimum GPU layers ) # Display performance overview st.subheader("🏆 Performance Overview") # Get top device from Glicko-2 rankings top_device = g2_confident.index[0] if not g2_confident.empty else "N/A" top_device_clean = ( clean_device_id(top_device) if top_device != "N/A" else "N/A" ) # Calculate total unique devices and models total_devices = df["Normalized Device ID"].nunique() total_models = df["Model ID"].nunique() # Display metrics in columns col1, col2, col3 = st.columns([3, 1, 1]) with col1: st.metric("Top Device", top_device_clean) with col2: st.metric("Total Devices", total_devices) with col3: st.metric("Total Models", total_models) st.markdown("---") # Display confident rankings if not g2_confident.empty: st.subheader("📱 Device Rankings") # Create a copy and handle the index g2_confident_display = g2_confident.copy() # Get the device ID column name device_id_col = g2_confident_display.index.name or "device" g2_confident_display = g2_confident_display.reset_index() # Get platform information from the original dataframe platform_map = ( df.groupby("Normalized Device ID")["Platform"].first().to_dict() ) g2_confident_display["Platform"] = g2_confident_display[ device_id_col ].map(platform_map) # Get model size range from the original dataframe model_sizes = df.groupby("Normalized Device ID")["Model Size"].agg( ["min", "max"] ) g2_confident_display["Model Size Range"] = g2_confident_display[ device_id_col ].apply( lambda x: f"{model_sizes.loc[x, 'min']:.1f}B - {model_sizes.loc[x, 'max']:.1f}B" ) # Add clean device name g2_confident_display["Device"] = g2_confident_display[ device_id_col ].apply(clean_device_id) # Round numeric columns to whole numbers numeric_cols = [ "combined_rating", "combined_rd", "token_rating", "prompt_rating", ] for col in numeric_cols: if col in g2_confident_display.columns: g2_confident_display[col] = ( g2_confident_display[col].round(0).astype(int) ) # Select and order columns for display display_cols = [ "Device", "Platform", "combined_rating", "combined_rd", "token_rating", "prompt_rating", "Model Size Range", ] # Rename columns for better display rename_map = { "combined_rating": "Rating", "combined_rd": "Rating Deviation", "token_rating": "Token Rating", "prompt_rating": "Prompt Rating", } g2_confident_display = g2_confident_display.rename(columns=rename_map) # Sort by Rating g2_confident_display = g2_confident_display.sort_values( "Rating", ascending=False ) # Add rank column g2_confident_display = g2_confident_display.reset_index(drop=True) g2_confident_display.index = g2_confident_display.index + 1 g2_confident_display = g2_confident_display.rename_axis("Rank") # Display the table st.dataframe( g2_confident_display[ [ "Device", "Platform", "Rating", "Rating Deviation", "Token Rating", "Prompt Rating", "Model Size Range", ] ], use_container_width=True, height=min(600, (len(g2_confident_display) + 1) * 35 + 40), hide_index=False, ) # Platform statistics st.markdown("#### Platform Statistics") platform_stats = ( g2_confident_display.groupby("Platform") .agg( { "Rating": ["mean", "std"], } ) .round(0) .astype(int) ) st.dataframe(platform_stats, use_container_width=True) else: st.warning( "No confident rankings available. Try adjusting the minimum matches threshold." ) except Exception as e: st.error(f"Error calculating Glicko-2 rankings: {str(e)}")