#!/usr/bin/env python3 """ AusCyberBench Evaluation Dashboard Interactive Gradio Space for benchmarking LLMs on Australian cybersecurity knowledge """ import gradio as gr import spaces import torch import gc import json import re import time import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from pathlib import Path from collections import defaultdict from datasets import load_dataset from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig import numpy as np # Australian color scheme AUSSIE_GREEN = '#008751' AUSSIE_GOLD = '#FFB81C' # Model categories - proven stable models MODELS_BY_CATEGORY = { "✅ Recommended (Tested)": [ "microsoft/Phi-3-mini-4k-instruct", # Proven stable "microsoft/Phi-3.5-mini-instruct", # Works well "Qwen/Qwen2.5-3B-Instruct", # Just tested 55.6%! ⭐ "Qwen/Qwen2.5-7B-Instruct", # Good performance "deepseek-ai/deepseek-llm-7b-chat", # Previously tested 55%+ "TinyLlama/TinyLlama-1.1B-Chat-v1.0", # Previously tested 33%+ ], "🛡️ Cybersecurity-Focused": [ "deepseek-ai/deepseek-coder-6.7b-instruct", # Code security "WizardLM/WizardCoder-Python-7B-V1.0", # Wizard Coder "bigcode/starcoder2-7b", # StarCoder2 "meta-llama/CodeLlama-7b-Instruct-hf", # CodeLlama "Salesforce/codegen25-7b-instruct", # CodeGen ], "Small Models (1-4B)": [ "microsoft/Phi-3-mini-4k-instruct", "microsoft/Phi-3.5-mini-instruct", "Qwen/Qwen2.5-3B-Instruct", "TinyLlama/TinyLlama-1.1B-Chat-v1.0", # Removed gated models: google/gemma-2-2b-it, meta-llama/Llama-3.2-3B-Instruct # Removed: stabilityai/stablelm-2-1_6b-chat (0% accuracy) ], "Medium Models (7-12B)": [ "mistralai/Mistral-7B-Instruct-v0.3", "Qwen/Qwen2.5-7B-Instruct", "mistralai/Mistral-Nemo-Instruct-2407", "01-ai/Yi-1.5-9B-Chat", # Removed gated models: meta-llama/Llama-3.1-8B-Instruct, google/gemma-2-9b-it ], "Reasoning & Analysis": [ "deepseek-ai/deepseek-llm-7b-chat", "upstage/SOLAR-10.7B-Instruct-v1.0", "NousResearch/Hermes-3-Llama-3.1-8B", "Qwen/Qwen2.5-14B-Instruct", ], "Diverse & Multilingual": [ "tiiuae/falcon-7b-instruct", "openchat/openchat-3.5-0106", "teknium/OpenHermes-2.5-Mistral-7B", ], } # Flatten all models ALL_MODELS = [model for category in MODELS_BY_CATEGORY.values() for model in category] # Global state current_results = [] dataset_cache = None PERSISTENT_RESULTS_FILE = "persistent_results.json" def load_persistent_results(): """Load persistent results from disk""" if Path(PERSISTENT_RESULTS_FILE).exists(): try: with open(PERSISTENT_RESULTS_FILE, 'r') as f: return json.load(f) except Exception as e: print(f"Error loading persistent results: {e}") return [] return [] def save_persistent_results(results): """Save results to persistent storage""" try: with open(PERSISTENT_RESULTS_FILE, 'w') as f: json.dump(results, f, indent=2) except Exception as e: print(f"Error saving persistent results: {e}") def merge_results(existing_results, new_results): """Merge new results with existing, keeping best score per model""" # Create dict of existing results keyed by model name results_dict = {r['model']: r for r in existing_results} # Update with new results (keep best accuracy) for new_result in new_results: model_name = new_result['model'] if model_name in results_dict: # Keep result with higher accuracy existing_acc = results_dict[model_name].get('overall_accuracy', 0) new_acc = new_result.get('overall_accuracy', 0) if new_acc > existing_acc: results_dict[model_name] = new_result else: results_dict[model_name] = new_result # Convert back to list and sort by accuracy merged = list(results_dict.values()) merged.sort(key=lambda x: x.get('overall_accuracy', 0), reverse=True) return merged def clear_persistent_results(): """Clear all persistent results""" try: if Path(PERSISTENT_RESULTS_FILE).exists(): Path(PERSISTENT_RESULTS_FILE).unlink() # Return empty displays return ( "✅ Persistent results cleared!", pd.DataFrame(), None, None ) except Exception as e: return ( f"❌ Error clearing results: {e}", pd.DataFrame(), None, None ) def load_initial_leaderboard(): """Load and display persistent leaderboard on startup""" persistent_results = load_persistent_results() if persistent_results: table = format_results_table(persistent_results) chart = create_comparison_chart(persistent_results) download = create_download_data(persistent_results) return table, chart, download return pd.DataFrame(), None, None def load_benchmark_dataset(subset="australian", num_samples=200): """Load and sample AusCyberBench dataset""" global dataset_cache if dataset_cache is None: # Load data files individually to handle different schemas per file from datasets import concatenate_datasets # Get list of category files for the subset import glob from huggingface_hub import hf_hub_download # Manually specify the categories to avoid globbing issues categories = [ "knowledge_terminology", "knowledge_threat_intelligence", "regulatory_essential_eight", "regulatory_ism_controls", "regulatory_privacy_act", "regulatory_soci_act" ] datasets_list = [] for category in categories: try: ds = load_dataset( "json", data_files=f"hf://datasets/Zen0/AusCyberBench/data/{subset}/{category}.jsonl", split="train" ) # Remove metadata columns that may differ between files cols_to_remove = [col for col in ds.column_names if col not in [ 'task_id', 'category', 'subcategory', 'title', 'description', 'task_type', 'difficulty', 'answer', 'options', 'context', 'australian_focus', 'regulatory_references' ]] if cols_to_remove: ds = ds.remove_columns(cols_to_remove) datasets_list.append(ds) except Exception as e: print(f"Warning: Could not load {category}: {e}") # Concatenate all datasets dataset_cache = concatenate_datasets(datasets_list) # Proportional sampling import random random.seed(42) by_category = defaultdict(list) for item in dataset_cache: by_category[item['category']].append(item) total = len(dataset_cache) samples = [] for cat, items in by_category.items(): n_cat = max(1, int(len(items) / total * num_samples)) if len(items) <= n_cat: samples.extend(items) else: samples.extend(random.sample(items, n_cat)) random.shuffle(samples) return samples[:num_samples] def format_prompt(task, model_name): """Format task as prompt with proper chat template""" question = task['description'] if task.get('task_type') == 'multiple_choice' and 'options' in task: options_text = "\n".join([f"{opt['id']}. {opt['text']}" for opt in task['options']]) if 'phi' in model_name.lower(): return f"""<|user|> {question} {options_text} Respond with ONLY the letter of the correct answer (A, B, C, or D).<|end|> <|assistant|>""" elif 'gemma' in model_name.lower(): return f"""user {question} {options_text} Respond with ONLY the letter of the correct answer (A, B, C, or D). model """ else: return f"""[INST] {question} {options_text} Respond with ONLY the letter of the correct answer (A, B, C, or D). [/INST]""" else: return f"""[INST] {question} [/INST]""" def extract_answer(response, task): """Extract answer letter from model response""" response = response.strip() if task.get('task_type') == 'multiple_choice': # Try multiple extraction patterns # Pattern 1: Letter with word boundary match = re.search(r'\b([A-D])\b', response, re.IGNORECASE) if match: return match.group(1).upper() # Pattern 2: Letter with punctuation (A. A) A: etc) match = re.search(r'([A-D])[.):,]', response, re.IGNORECASE) if match: return match.group(1).upper() # Pattern 3: "Answer: A" or "Answer is A" match = re.search(r'(?:answer|choice)(?:\s+is)?\s*:?\s*([A-D])\b', response, re.IGNORECASE) if match: return match.group(1).upper() # Pattern 4: First character if it's A-D if response and response[0].upper() in ['A', 'B', 'C', 'D']: return response[0].upper() # Pattern 5: Look anywhere in first 50 chars for isolated letter first_part = response[:50] for char in first_part: if char.upper() in ['A', 'B', 'C', 'D']: return char.upper() return "" else: return response[:100] def cleanup_model(model, tokenizer): """Thoroughly clean up model to free memory""" if model is not None: del model if tokenizer is not None: del tokenizer if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.ipc_collect() gc.collect() @spaces.GPU # Uses default 60s duration (ZeroGPU free tier limit) def evaluate_single_model(model_name, tasks, use_4bit=True, temperature=0.7, max_tokens=128, progress=gr.Progress()): """Evaluate a single model on the benchmark""" progress(0, desc=f"Loading {model_name.split('/')[-1]}...") try: # Load model if use_4bit: quant_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4" ) else: quant_config = None tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True, use_fast=False) model = AutoModelForCausalLM.from_pretrained( model_name, quantization_config=quant_config, device_map="auto", trust_remote_code=True, torch_dtype=torch.float16 if not use_4bit else None ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token progress(0.1, desc=f"Evaluating {model_name.split('/')[-1]}...") # Evaluate tasks results = [] for i, task in enumerate(tasks): progress((0.1 + 0.8 * i / len(tasks)), desc=f"Task {i+1}/{len(tasks)}") try: prompt = format_prompt(task, model_name) # COMPREHENSIVE DEBUG if i == 0: import sys debug_msg = f"\n{'='*60}\nDEBUG FIRST TASK\n{'='*60}\n" debug_msg += f"Prompt length: {len(prompt)} chars\n" debug_msg += f"Prompt preview: {prompt[:200]}...\n" print(debug_msg, flush=True) sys.stdout.flush() inputs = tokenizer(prompt, return_tensors="pt").to(model.device) if 'token_type_ids' in inputs: inputs.pop('token_type_ids') if i == 0: print(f"Input shape: {inputs['input_ids'].shape}", flush=True) with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=max_tokens, temperature=temperature, do_sample=True, top_p=0.9, pad_token_id=tokenizer.eos_token_id, use_cache=False # Disable KV cache to avoid DynamicCache compatibility issues ) if i == 0: print(f"Output shape: {outputs.shape}", flush=True) print(f"Input length: {inputs['input_ids'].shape[1]}", flush=True) response = tokenizer.decode( outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True ) # FORCE PRINT WITH FLUSH if i < 3: import sys msg = f"\n>>> TASK {i} RESPONSE: '{response}' (len={len(response)})\n" print(msg, flush=True) sys.stdout.flush() # Also write to file for debugging with open('/tmp/debug_responses.txt', 'a') as f: f.write(msg) predicted = extract_answer(response, task) correct = task.get('answer', '') is_correct = predicted.upper() == correct.upper() if i < 3: msg = f">>> TASK {i} EXTRACT: predicted='{predicted}', correct='{correct}', match={is_correct}\n" print(msg, flush=True) sys.stdout.flush() with open('/tmp/debug_responses.txt', 'a') as f: f.write(msg) results.append({ 'task_id': task.get('task_id'), 'category': task.get('category'), 'predicted': predicted, 'correct': correct, 'is_correct': is_correct }) except Exception as e: import traceback import sys error_msg = f"\n!!! EXCEPTION in task {i}: {str(e)}\n{traceback.format_exc()}\n" print(error_msg, flush=True) sys.stdout.flush() with open('/tmp/debug_responses.txt', 'a') as f: f.write(error_msg) results.append({ 'task_id': task.get('task_id'), 'category': task.get('category'), 'predicted': '', 'correct': task.get('answer', ''), 'is_correct': False }) # Calculate metrics total_correct = sum(1 for r in results if r['is_correct']) overall_accuracy = (total_correct / len(results)) * 100 category_stats = defaultdict(lambda: {'correct': 0, 'total': 0}) for result in results: cat = result['category'] category_stats[cat]['total'] += 1 if result['is_correct']: category_stats[cat]['correct'] += 1 category_scores = { cat: (stats['correct'] / stats['total']) * 100 if stats['total'] > 0 else 0 for cat, stats in category_stats.items() } progress(1.0, desc="Complete!") return { 'model': model_name, 'overall_accuracy': overall_accuracy, 'total_correct': total_correct, 'total_tasks': len(results), 'category_scores': category_scores, 'detailed_results': results } except Exception as e: return { 'model': model_name, 'error': str(e), 'overall_accuracy': 0, 'total_correct': 0, 'total_tasks': len(tasks) } finally: cleanup_model( model if 'model' in locals() else None, tokenizer if 'tokenizer' in locals() else None ) def run_evaluation(selected_models, num_samples, use_4bit, temperature, max_tokens, progress=gr.Progress()): """Run evaluation on selected models""" global current_results if not selected_models: return "Please select at least one model to evaluate.", None, None # Load existing persistent results persistent_results = load_persistent_results() # Load dataset progress(0, desc="Loading AusCyberBench dataset...") tasks = load_benchmark_dataset(num_samples=num_samples) # Evaluate each model new_results = [] for i, model_name in enumerate(selected_models): progress((i / len(selected_models)), desc=f"Model {i+1}/{len(selected_models)}") result = evaluate_single_model( model_name, tasks, use_4bit, temperature, max_tokens, progress ) new_results.append(result) # Merge with persistent results after each model current_results = merge_results(persistent_results, new_results) save_persistent_results(current_results) # Yield intermediate results (showing full leaderboard including historical) yield format_results_table(current_results), create_comparison_chart(current_results), None # Final results (merged with historical) current_results = merge_results(persistent_results, new_results) save_persistent_results(current_results) final_table = format_results_table(current_results) final_chart = create_comparison_chart(current_results) download_data = create_download_data(current_results) yield final_table, final_chart, download_data def format_results_table(results): """Format results as DataFrame for display""" if not results: return pd.DataFrame() rows = [] for result in results: if 'error' in result: rows.append({ 'Rank': '❌', 'Model': result['model'].split('/')[-1], 'Accuracy': '0.0%', 'Correct/Total': f"0/{result['total_tasks']}", 'Status': f"Error: {result['error'][:50]}" }) else: rows.append({ 'Rank': '', 'Model': result['model'].split('/')[-1], 'Accuracy': f"{result['overall_accuracy']:.1f}%", 'Correct/Total': f"{result['total_correct']}/{result['total_tasks']}", 'Status': '✓ Complete' }) df = pd.DataFrame(rows) # Sort by accuracy and assign ranks df['_sort'] = df['Accuracy'].str.replace('%', '').astype(float) df = df.sort_values('_sort', ascending=False) # Assign medals (handle cases with fewer than 3 models) medals = ['🥇', '🥈', '🥉'] ranks = medals[:len(df)] + [''] * max(0, len(df) - len(medals)) df['Rank'] = ranks df = df.drop('_sort', axis=1) return df def create_comparison_chart(results): """Create enhanced bar chart comparing model accuracies with Australian color scheme""" if not results or all('error' in r for r in results): return None valid_results = [r for r in results if 'error' not in r] if not valid_results: return None models = [r['model'].split('/')[-1] for r in valid_results] accuracies = [r['overall_accuracy'] for r in valid_results] # Sort by accuracy sorted_pairs = sorted(zip(models, accuracies), key=lambda x: x[1], reverse=True) models, accuracies = zip(*sorted_pairs) # Create figure with Australian colors fig, ax = plt.subplots(figsize=(14, max(7, len(models) * 0.45))) # Create color gradient from green to gold colors = [] for i, acc in enumerate(accuracies): # Top performers get gold, others get green with varying intensity if i == 0: colors.append(AUSSIE_GOLD) elif i < 3: colors.append('#00A86B') # Bright green else: colors.append(AUSSIE_GREEN) bars = ax.barh(models, accuracies, color=colors, edgecolor='black', linewidth=0.5) # Add accuracy labels for i, (model, acc) in enumerate(zip(models, accuracies)): ax.text(acc + 1.5, i, f'{acc:.1f}%', va='center', fontweight='bold', fontsize=10) # Styling ax.set_xlabel('Accuracy (%)', fontsize=13, fontweight='bold') ax.set_title('AusCyberBench: Model Performance Ranking', fontsize=15, fontweight='bold', pad=20) ax.set_xlim(0, min(100, max(accuracies) + 10)) ax.grid(axis='x', alpha=0.3, linestyle='--') ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) # Add background color ax.set_facecolor('#f9f9f9') plt.tight_layout() return plt def create_download_data(results): """Create downloadable results file""" if not results: return None # Create comprehensive results JSON output = { 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'benchmark': 'AusCyberBench', 'results': results } # Save to file output_path = 'auscyberbench_results.json' with open(output_path, 'w') as f: json.dump(output, f, indent=2) return output_path # Build Gradio interface with gr.Blocks(title="AusCyberBench Evaluation Dashboard", theme=gr.themes.Soft()) as app: gr.Markdown(""" # 🇦🇺 AusCyberBench Evaluation Dashboard **Australia's First LLM Cybersecurity Benchmark** • 13,449 Tasks • 25 Open Models Evaluate proven open language models on Australian cybersecurity knowledge including Essential Eight, ISM Controls, Privacy Act, SOCI Act, and ACSC Threat Intelligence. ✅ **Recommended models** have been tested: Qwen2.5-3B (55.6%), DeepSeek (55%), TinyLlama (33%) """) # Settings section at top for better UX gr.Markdown("## ⚙️ Evaluation Settings") with gr.Row(): num_samples = gr.Slider(10, 500, value=10, step=10, label="Number of Tasks (10 recommended)") use_4bit = gr.Checkbox(label="Use 4-bit Quantisation", value=True) with gr.Row(): temperature = gr.Slider(0.1, 1.0, value=0.7, step=0.1, label="Temperature") max_tokens = gr.Slider(8, 256, value=32, step=8, label="Max New Tokens") run_btn = gr.Button("🚀 Run Evaluation", variant="primary", size="lg") gr.Markdown("---") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📋 Model Selection") gr.Markdown(""" **💾 Persistent Results:** Run 1-2 models at a time to avoid GPU timeouts. Results merge with the leaderboard automatically! """) # Quick selection buttons with gr.Row(): btn_recommended = gr.Button("✅ Recommended (6)", size="sm", variant="primary") btn_security = gr.Button("🛡️ Security (5)", size="sm", variant="secondary") with gr.Row(): btn_small = gr.Button("Small (4)", size="sm") btn_medium = gr.Button("Medium (4)", size="sm") with gr.Row(): btn_all = gr.Button("Select All (25)", size="sm") btn_clear = gr.Button("Clear All", size="sm") # Model checkboxes by category model_checkboxes = [] for category, models in MODELS_BY_CATEGORY.items(): gr.Markdown(f"**{category}**") for model in models: short_name = model.split('/')[-1] cb = gr.Checkbox(label=f"{short_name}", value=False) model_checkboxes.append((cb, model)) gr.Markdown("### ⚡ GPU Limits") gr.Markdown(""" **Free tier: 60-second limit** - ✅ 1-2 models: Safe - ⚠️ 3-5 models: May timeout - ❌ 6+ models: Will timeout """) with gr.Column(scale=2): gr.Markdown("### 📊 Persistent Leaderboard") gr.Markdown(""" **💾 Results persist across sessions!** Run models one at a time to build up a complete leaderboard. - New runs merge with existing results - Best score per model is kept - Perfect for avoiding GPU timeouts """) clear_status = gr.Markdown("") clear_btn = gr.Button("🗑️ Clear All Results", size="sm", variant="stop") results_table = gr.Dataframe( label="Leaderboard", headers=["Rank", "Model", "Accuracy", "Correct/Total", "Status"], interactive=False ) comparison_plot = gr.Plot(label="Model Comparison") download_file = gr.File(label="Download Results (JSON)") # Quick select button actions def select_recommended(): return [gr.update(value=(model in MODELS_BY_CATEGORY["✅ Recommended (Tested)"])) for cb, model in model_checkboxes] def select_security(): return [gr.update(value=(model in MODELS_BY_CATEGORY["🛡️ Cybersecurity-Focused"])) for cb, model in model_checkboxes] def select_small(): return [gr.update(value=(model in MODELS_BY_CATEGORY["Small Models (1-4B)"])) for cb, model in model_checkboxes] def select_medium(): return [gr.update(value=(model in MODELS_BY_CATEGORY["Medium Models (7-12B)"])) for cb, model in model_checkboxes] def select_all(): return [gr.update(value=True) for _ in model_checkboxes] def clear_all(): return [gr.update(value=False) for _ in model_checkboxes] btn_recommended.click(select_recommended, outputs=[cb for cb, _ in model_checkboxes]) btn_security.click(select_security, outputs=[cb for cb, _ in model_checkboxes]) btn_small.click(select_small, outputs=[cb for cb, _ in model_checkboxes]) btn_medium.click(select_medium, outputs=[cb for cb, _ in model_checkboxes]) btn_all.click(select_all, outputs=[cb for cb, _ in model_checkboxes]) btn_clear.click(clear_all, outputs=[cb for cb, _ in model_checkboxes]) # Run evaluation def prepare_evaluation(*checkbox_values): selected = [model for (cb, model), val in zip(model_checkboxes, checkbox_values) if val] return selected def evaluation_wrapper(*args): """Wrapper to handle checkbox inputs and call run_evaluation as generator""" selected = prepare_evaluation(*args[:-4]) yield from run_evaluation( selected, int(args[-4]), args[-3], args[-2], int(args[-1]) ) run_btn.click( fn=evaluation_wrapper, inputs=[cb for cb, _ in model_checkboxes] + [num_samples, use_4bit, temperature, max_tokens], outputs=[results_table, comparison_plot, download_file] ) # Clear results button clear_btn.click( fn=clear_persistent_results, outputs=[clear_status, results_table, comparison_plot, download_file] ) # Load persistent leaderboard on startup app.load( fn=load_initial_leaderboard, outputs=[results_table, comparison_plot, download_file] ) gr.Markdown(""" --- **Dataset:** [Zen0/AusCyberBench](https://huggingface.co/datasets/Zen0/AusCyberBench) • 13,449 tasks | **Models:** 25 open LLMs (no gated models) | **License:** MIT """) if __name__ == "__main__": app.queue().launch()