#!/usr/bin/env python3 """ WrinkleBrane Performance Benchmark Suite Comprehensive analysis of scaling laws and optimization opportunities. """ import sys from pathlib import Path sys.path.append(str(Path(__file__).resolve().parent / "src")) import torch import numpy as np import time import matplotlib.pyplot as plt from wrinklebrane.membrane_bank import MembraneBank from wrinklebrane.codes import hadamard_codes, dct_codes, gaussian_codes from wrinklebrane.slicer import make_slicer from wrinklebrane.write_ops import store_pairs from wrinklebrane.metrics import psnr, spectral_entropy_2d, gzip_ratio def benchmark_memory_scaling(): """Benchmark memory usage and performance across different scales.""" print("šŸ“Š Memory Scaling Benchmark") print("="*40) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Test different membrane dimensions configs = [ {"L": 32, "H": 16, "W": 16, "K": 16, "B": 1}, {"L": 64, "H": 32, "W": 32, "K": 32, "B": 1}, {"L": 128, "H": 64, "W": 64, "K": 64, "B": 1}, {"L": 256, "H": 128, "W": 128, "K": 128, "B": 1}, ] results = [] for config in configs: L, H, W, K, B = config["L"], config["H"], config["W"], config["K"], config["B"] print(f"Testing L={L}, H={H}, W={W}, K={K}, B={B}") # Calculate memory footprint membrane_memory = B * L * H * W * 4 # 4 bytes per float32 code_memory = L * K * 4 total_memory = membrane_memory + code_memory # Setup bank = MembraneBank(L=L, H=H, W=W, device=device) bank.allocate(B) C = hadamard_codes(L, K).to(device) slicer = make_slicer(C) patterns = torch.rand(K, H, W, device=device) keys = torch.arange(K, device=device) alphas = torch.ones(K, device=device) # Benchmark write speed start_time = time.time() iterations = max(1, 100 // (L // 32)) # Scale iterations based on size for _ in range(iterations): M = store_pairs(bank.read(), C, keys, patterns, alphas) bank.write(M - bank.read()) write_time = (time.time() - start_time) / iterations # Benchmark read speed start_time = time.time() read_iterations = iterations * 10 for _ in range(read_iterations): readouts = slicer(bank.read()) read_time = (time.time() - start_time) / read_iterations # Calculate fidelity readouts = slicer(bank.read()).squeeze(0) avg_psnr = 0 for i in range(K): psnr_val = psnr(patterns[i].cpu().numpy(), readouts[i].cpu().numpy()) avg_psnr += psnr_val avg_psnr /= K result = { "config": config, "memory_mb": total_memory / 1e6, "write_time_ms": write_time * 1000, "read_time_ms": read_time * 1000, "write_throughput": K / write_time, "read_throughput": K * B / read_time, "fidelity_psnr": avg_psnr } results.append(result) print(f" Memory: {result['memory_mb']:.2f}MB") print(f" Write: {result['write_time_ms']:.2f}ms ({result['write_throughput']:.0f} patterns/sec)") print(f" Read: {result['read_time_ms']:.2f}ms ({result['read_throughput']:.0f} readouts/sec)") print(f" PSNR: {result['fidelity_psnr']:.1f}dB") print() return results def benchmark_capacity_limits(): """Test WrinkleBrane capacity limits and interference scaling.""" print("🧮 Capacity Limits Benchmark") print("="*40) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") L, H, W, B = 64, 32, 32, 1 # Test increasing number of stored patterns pattern_counts = [4, 8, 16, 32, 64, 128, 256] results = [] for K in pattern_counts: print(f"Testing {K} patterns...") bank = MembraneBank(L=L, H=H, W=W, device=device) bank.allocate(B) C = hadamard_codes(L, K).to(device) slicer = make_slicer(C) # Generate random patterns patterns = torch.rand(K, H, W, device=device) keys = torch.arange(K, device=device) alphas = torch.ones(K, device=device) # Store patterns M = store_pairs(bank.read(), C, keys, patterns, alphas) bank.write(M - bank.read()) # Measure interference readouts = slicer(bank.read()).squeeze(0) # Calculate metrics psnr_values = [] entropy_values = [] compression_values = [] for i in range(K): psnr_val = psnr(patterns[i].cpu().numpy(), readouts[i].cpu().numpy()) entropy_val = spectral_entropy_2d(readouts[i]) compression_val = gzip_ratio(readouts[i]) psnr_values.append(psnr_val) entropy_values.append(entropy_val) compression_values.append(compression_val) # Theoretical capacity based on orthogonality theoretical_capacity = L # For perfect orthogonal codes capacity_utilization = K / theoretical_capacity result = { "K": K, "avg_psnr": np.mean(psnr_values), "min_psnr": np.min(psnr_values), "std_psnr": np.std(psnr_values), "avg_entropy": np.mean(entropy_values), "avg_compression": np.mean(compression_values), "capacity_utilization": capacity_utilization } results.append(result) print(f" PSNR: {result['avg_psnr']:.1f}±{result['std_psnr']:.1f}dB (min: {result['min_psnr']:.1f}dB)") print(f" Entropy: {result['avg_entropy']:.3f}") print(f" Compression: {result['avg_compression']:.3f}") print(f" Capacity utilization: {result['capacity_utilization']:.1%}") print() return results def benchmark_code_types(): """Compare performance of different orthogonal code types.""" print("🧬 Code Types Benchmark") print("="*40) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") L, H, W, K, B = 64, 32, 32, 32, 1 code_generators = { "Hadamard": lambda: hadamard_codes(L, K).to(device), "DCT": lambda: dct_codes(L, K).to(device), "Gaussian": lambda: gaussian_codes(L, K).to(device) } results = {} patterns = torch.rand(K, H, W, device=device) keys = torch.arange(K, device=device) alphas = torch.ones(K, device=device) for name, code_gen in code_generators.items(): print(f"Testing {name} codes...") # Setup bank = MembraneBank(L=L, H=H, W=W, device=device) bank.allocate(B) C = code_gen() slicer = make_slicer(C) # Measure orthogonality G = C.T @ C I = torch.eye(K, device=device, dtype=C.dtype) orthogonality_error = torch.norm(G - I).item() # Store and retrieve patterns M = store_pairs(bank.read(), C, keys, patterns, alphas) bank.write(M - bank.read()) readouts = slicer(bank.read()).squeeze(0) # Calculate fidelity metrics psnr_values = [] for i in range(K): psnr_val = psnr(patterns[i].cpu().numpy(), readouts[i].cpu().numpy()) psnr_values.append(psnr_val) # Benchmark speed start_time = time.time() for _ in range(100): M = store_pairs(bank.read(), C, keys, patterns, alphas) write_time = (time.time() - start_time) / 100 start_time = time.time() for _ in range(1000): readouts = slicer(bank.read()) read_time = (time.time() - start_time) / 1000 result = { "orthogonality_error": orthogonality_error, "avg_psnr": np.mean(psnr_values), "std_psnr": np.std(psnr_values), "write_time_ms": write_time * 1000, "read_time_ms": read_time * 1000 } results[name] = result print(f" Orthogonality error: {result['orthogonality_error']:.6f}") print(f" PSNR: {result['avg_psnr']:.1f}±{result['std_psnr']:.1f}dB") print(f" Write time: {result['write_time_ms']:.3f}ms") print(f" Read time: {result['read_time_ms']:.3f}ms") print() return results def benchmark_gpu_acceleration(): """Compare CPU vs GPU performance if available.""" print("⚔ GPU Acceleration Benchmark") print("="*40) if not torch.cuda.is_available(): print("CUDA not available, skipping GPU benchmark") return None L, H, W, K, B = 128, 64, 64, 64, 4 patterns = torch.rand(K, H, W) keys = torch.arange(K) alphas = torch.ones(K) devices = [torch.device("cpu"), torch.device("cuda")] results = {} for device in devices: print(f"Testing on {device}...") # Setup bank = MembraneBank(L=L, H=H, W=W, device=device) bank.allocate(B) C = hadamard_codes(L, K).to(device) slicer = make_slicer(C) patterns_dev = patterns.to(device) keys_dev = keys.to(device) alphas_dev = alphas.to(device) # Warmup for _ in range(10): M = store_pairs(bank.read(), C, keys_dev, patterns_dev, alphas_dev) bank.write(M - bank.read()) readouts = slicer(bank.read()) if device.type == "cuda": torch.cuda.synchronize() # Benchmark write start_time = time.time() for _ in range(100): M = store_pairs(bank.read(), C, keys_dev, patterns_dev, alphas_dev) bank.write(M - bank.read()) if device.type == "cuda": torch.cuda.synchronize() write_time = (time.time() - start_time) / 100 # Benchmark read start_time = time.time() for _ in range(1000): readouts = slicer(bank.read()) if device.type == "cuda": torch.cuda.synchronize() read_time = (time.time() - start_time) / 1000 result = { "write_time_ms": write_time * 1000, "read_time_ms": read_time * 1000, "write_throughput": K * B / write_time, "read_throughput": K * B / read_time } results[str(device)] = result print(f" Write: {result['write_time_ms']:.2f}ms ({result['write_throughput']:.0f} patterns/sec)") print(f" Read: {result['read_time_ms']:.2f}ms ({result['read_throughput']:.0f} readouts/sec)") print() # Calculate speedup if len(results) == 2: cpu_result = results["cpu"] gpu_result = results["cuda"] write_speedup = cpu_result["write_time_ms"] / gpu_result["write_time_ms"] read_speedup = cpu_result["read_time_ms"] / gpu_result["read_time_ms"] print(f"GPU Speedup - Write: {write_speedup:.1f}x, Read: {read_speedup:.1f}x") return results def main(): """Run comprehensive WrinkleBrane performance benchmark suite.""" print("⚔ WrinkleBrane Performance Benchmark Suite") print("="*50) # Set random seeds for reproducibility torch.manual_seed(42) np.random.seed(42) try: memory_results = benchmark_memory_scaling() capacity_results = benchmark_capacity_limits() code_results = benchmark_code_types() gpu_results = benchmark_gpu_acceleration() print("="*50) print("šŸ“ˆ Performance Summary:") print("="*50) # Memory scaling summary if memory_results: largest = memory_results[-1] print(f"Largest tested configuration:") print(f" L={largest['config']['L']}, Memory: {largest['memory_mb']:.1f}MB") print(f" Write throughput: {largest['write_throughput']:.0f} patterns/sec") print(f" Read throughput: {largest['read_throughput']:.0f} readouts/sec") print(f" Fidelity: {largest['fidelity_psnr']:.1f}dB") # Capacity summary if capacity_results: max_capacity = capacity_results[-1] print(f"\nMaximum tested capacity: {max_capacity['K']} patterns") print(f" Average PSNR: {max_capacity['avg_psnr']:.1f}dB") print(f" Capacity utilization: {max_capacity['capacity_utilization']:.1%}") # Code comparison summary if code_results: best_code = min(code_results.items(), key=lambda x: x[1]['orthogonality_error']) print(f"\nBest orthogonal codes: {best_code[0]}") print(f" Orthogonality error: {best_code[1]['orthogonality_error']:.6f}") print(f" Average PSNR: {best_code[1]['avg_psnr']:.1f}dB") print("\nāœ… WrinkleBrane Performance Analysis Complete!") except Exception as e: print(f"\nāŒ Benchmark failed with error: {e}") import traceback traceback.print_exc() return False return True if __name__ == "__main__": main()