|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
Utilities for PEFT benchmarking. |
|
""" |
|
|
|
import datetime |
|
import json |
|
import os |
|
import platform |
|
import subprocess |
|
from dataclasses import asdict, dataclass, field |
|
from enum import Enum |
|
from typing import Any, Callable, Optional |
|
from peft.utils import infer_device |
|
|
|
import psutil |
|
import torch |
|
|
|
|
|
FILE_NAME_BENCHMARK_PARAMS = "benchmark_params.json" |
|
FILE_NAME_DEFAULT_CONFIG = "default_benchmark_params.json" |
|
|
|
RESULT_PATH = os.path.join(os.path.dirname(__file__), "results") |
|
RESULT_PATH_TEMP = os.path.join(os.path.dirname(__file__), "temporary_results") |
|
RESULT_PATH_CANCELLED = os.path.join(os.path.dirname(__file__), "cancelled_results") |
|
|
|
|
|
class BenchmarkStatus(Enum): |
|
"""Status of a benchmark run.""" |
|
|
|
SUCCESS = "success" |
|
FAILED = "failed" |
|
CANCELLED = "cancelled" |
|
RUNNING = "running" |
|
|
|
|
|
@dataclass |
|
class BenchmarkResult: |
|
"""Container for benchmark results.""" |
|
|
|
experiment_name: str |
|
status: BenchmarkStatus |
|
|
|
model_id: str |
|
|
|
run_info: dict = field(default_factory=dict) |
|
generation_info: dict = field(default_factory=dict) |
|
meta_info: dict = field(default_factory=dict) |
|
|
|
def __post_init__(self): |
|
"""Initialize structured data format.""" |
|
device = infer_device() |
|
torch_accelerator_module = getattr(torch, device, torch.cuda) |
|
self.run_info = { |
|
"timestamp": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(), |
|
"duration": 0.0, |
|
"status": self.status.value, |
|
"hardware": { |
|
"num_accelerators": torch_accelerator_module.device_count() if torch_accelerator_module.is_available() else 0, |
|
"accelerator_type": torch_accelerator_module.get_device_name(0) if torch_accelerator_module.is_available() else "N/A", |
|
"cuda_version": torch.version.cuda if torch.cuda.is_available() else "N/A", |
|
"pytorch_version": torch.__version__, |
|
}, |
|
} |
|
|
|
self.meta_info = { |
|
"model_id": self.model_id, |
|
"parameters": { |
|
"base_params": 0, |
|
"trainable_params": 0, |
|
"total_params": 0, |
|
"param_ratio": 0.0, |
|
}, |
|
"model_size": { |
|
"base_model_size_mb": 0.0, |
|
"adapter_size_mb": 0.0, |
|
}, |
|
"package_info": { |
|
"transformers-version": None, |
|
"transformers-commit-hash": None, |
|
"peft-version": None, |
|
"peft-commit-hash": None, |
|
"datasets-version": None, |
|
"datasets-commit-hash": None, |
|
"bitsandbytes-version": None, |
|
"bitsandbytes-commit-hash": None, |
|
"torch-version": torch.__version__, |
|
"torch-commit-hash": None, |
|
}, |
|
"system_info": { |
|
"system": platform.system(), |
|
"release": platform.release(), |
|
"version": platform.version(), |
|
"machine": platform.machine(), |
|
"processor": platform.processor(), |
|
"accelerator": torch_accelerator_module.get_device_name(0) if torch_accelerator_module.is_available() else "N/A", |
|
}, |
|
} |
|
|
|
self.generation_info = { |
|
"memory": { |
|
"peak_accelerator_memory_mb": 0.0, |
|
"peak_ram_memory_mb": 0.0, |
|
"memory_logs": [], |
|
}, |
|
"by_category": {}, |
|
"overall": {}, |
|
} |
|
|
|
def update_meta_info(self, param_counts: dict, size_info: dict, package_info: Optional[dict] = None): |
|
"""Update model metadata information.""" |
|
self.meta_info["parameters"].update(param_counts) |
|
self.meta_info["model_size"].update(size_info) |
|
if package_info: |
|
self.meta_info["package_info"].update(package_info) |
|
|
|
def update_generation_info(self, memory_data: Optional[dict] = None, performance_metrics: Optional[dict] = None): |
|
"""Update generation performance information, primarily for memory and high-level performance.""" |
|
if memory_data: |
|
self.generation_info["memory"].update(memory_data) |
|
if performance_metrics: |
|
self.generation_info.update(performance_metrics) |
|
|
|
def add_memory_log(self, stage: str, ram_mb: float, accelerator_allocated_mb: float, accelerator_reserved_mb: float): |
|
"""Add a memory usage log entry to generation_info.""" |
|
self.generation_info["memory"]["memory_logs"].append( |
|
{ |
|
"stage": stage, |
|
"ram_mb": ram_mb, |
|
"accelerator_allocated_mb": accelerator_allocated_mb, |
|
"accelerator_reserved_mb": accelerator_reserved_mb, |
|
} |
|
) |
|
|
|
def add_metrics_for_category(self, category: str, metrics: dict, individual_samples: list = None): |
|
"""Add metrics for a specific prompt category under generation_info.""" |
|
category_data = {"metrics": metrics, "samples": individual_samples if individual_samples is not None else []} |
|
self.generation_info["by_category"][category] = category_data |
|
|
|
def update_run_info( |
|
self, |
|
duration: float, |
|
status: BenchmarkStatus, |
|
error: Optional[str] = None, |
|
peft_config: Optional[dict] = None, |
|
benchmark_config: Optional[dict] = None, |
|
): |
|
"""Update run information.""" |
|
self.run_info["duration"] = duration |
|
self.run_info["status"] = status.value |
|
if error: |
|
self.run_info["error"] = error |
|
if peft_config: |
|
self.run_info["peft_config"] = peft_config |
|
if benchmark_config: |
|
self.run_info["benchmark_config"] = benchmark_config |
|
|
|
def compute_overall_metrics(self): |
|
"""Compute overall metrics across all categories within generation_info.""" |
|
if not self.generation_info["by_category"]: |
|
return |
|
|
|
categories = self.generation_info["by_category"] |
|
key_metrics = [ |
|
"inference_time", |
|
"base_inference_time", |
|
"inference_overhead_pct", |
|
"time_per_token", |
|
"generated_tokens", |
|
] |
|
|
|
for metric in key_metrics: |
|
values = [] |
|
for category_data in categories.values(): |
|
if "metrics" in category_data and metric in category_data["metrics"]: |
|
values.append(category_data["metrics"][metric]) |
|
|
|
if values: |
|
self.generation_info["overall"][metric] = sum(values) / len(values) |
|
|
|
def to_dict(self) -> dict[str, Any]: |
|
"""Convert result to dictionary.""" |
|
self.compute_overall_metrics() |
|
return { |
|
"run_info": self.run_info, |
|
"generation_info": self.generation_info, |
|
"meta_info": self.meta_info, |
|
} |
|
|
|
def save(self, path: Optional[str] = None): |
|
"""Save result to JSON file.""" |
|
if path is None: |
|
peft_branch = get_peft_branch() |
|
if self.status == BenchmarkStatus.CANCELLED: |
|
base_path = RESULT_PATH_CANCELLED |
|
elif peft_branch != "main": |
|
base_path = RESULT_PATH_TEMP |
|
elif self.status == BenchmarkStatus.SUCCESS: |
|
base_path = RESULT_PATH |
|
elif self.status == BenchmarkStatus.FAILED: |
|
base_path = RESULT_PATH_CANCELLED |
|
else: |
|
base_path = RESULT_PATH_TEMP |
|
|
|
filename = f"{self.experiment_name}.json" |
|
path = os.path.join(base_path, filename) |
|
|
|
os.makedirs(os.path.dirname(path), exist_ok=True) |
|
|
|
with open(path, "w") as f: |
|
json.dump(self.to_dict(), f, indent=2) |
|
|
|
return path |
|
|
|
|
|
@dataclass |
|
class BenchmarkConfig: |
|
"""Configuration for benchmarking PEFT methods.""" |
|
|
|
model_id: str |
|
|
|
seed: int |
|
num_inference_runs: int |
|
max_new_tokens: int |
|
|
|
dtype: str = "float16" |
|
use_4bit: bool = False |
|
use_8bit: bool = False |
|
|
|
category_generation_params: Optional[dict] = None |
|
|
|
def __post_init__(self) -> None: |
|
"""Validate configuration.""" |
|
if not isinstance(self.model_id, str): |
|
raise ValueError(f"Invalid model_id: {self.model_id}") |
|
|
|
if self.seed < 0: |
|
raise ValueError(f"Invalid seed: {self.seed}") |
|
|
|
if self.num_inference_runs <= 0: |
|
raise ValueError(f"Invalid num_inference_runs: {self.num_inference_runs}") |
|
|
|
if self.max_new_tokens <= 0: |
|
raise ValueError(f"Invalid max_new_tokens: {self.max_new_tokens}") |
|
|
|
@classmethod |
|
def from_dict(cls, config_dict: dict) -> "BenchmarkConfig": |
|
"""Create config from dictionary.""" |
|
valid_keys = set(cls.__dataclass_fields__.keys()) |
|
filtered_dict = {k: v for k, v in config_dict.items() if k in valid_keys} |
|
|
|
return cls(**filtered_dict) |
|
|
|
@classmethod |
|
def from_json(cls, json_path: str) -> "BenchmarkConfig": |
|
"""Load config from JSON file.""" |
|
with open(json_path) as f: |
|
config_dict = json.load(f) |
|
return cls.from_dict(config_dict) |
|
|
|
def to_dict(self) -> dict[str, Any]: |
|
"""Convert config to dictionary.""" |
|
result = asdict(self) |
|
return result |
|
|
|
def save(self, path: str) -> None: |
|
"""Save config to JSON file.""" |
|
with open(path, "w") as f: |
|
json.dump(self.to_dict(), f, indent=2) |
|
|
|
def merge_from_dict(self, config_dict: dict) -> None: |
|
"""Merge settings from a dictionary into this config object. |
|
Keys in config_dict will override existing attributes. |
|
""" |
|
for key, value in config_dict.items(): |
|
if hasattr(self, key): |
|
setattr(self, key, value) |
|
|
|
|
|
def validate_experiment_path(path: str) -> tuple[str, "BenchmarkConfig"]: |
|
"""Validate experiment path, load and merge configs, and return them.""" |
|
if not os.path.exists(path): |
|
raise FileNotFoundError(f"Experiment path not found: {path}") |
|
|
|
path_parts = os.path.normpath(path).split(os.sep) |
|
|
|
try: |
|
experiments_idx = path_parts.index("experiments") |
|
except ValueError: |
|
experiment_name = os.path.basename(path.rstrip(os.sep)) |
|
else: |
|
if experiments_idx + 1 < len(path_parts): |
|
method_name = path_parts[experiments_idx + 1] |
|
remaining_parts = path_parts[experiments_idx + 2 :] |
|
if remaining_parts: |
|
remaining_name = "-".join(remaining_parts) |
|
experiment_name = f"{method_name}--{remaining_name}" |
|
else: |
|
experiment_name = method_name |
|
else: |
|
experiment_name = os.path.basename(path.rstrip(os.sep)) |
|
|
|
default_config_path = os.path.join(os.path.dirname(__file__), FILE_NAME_DEFAULT_CONFIG) |
|
experiment_benchmark_params_path = os.path.join(path, FILE_NAME_BENCHMARK_PARAMS) |
|
|
|
if not os.path.exists(default_config_path): |
|
raise FileNotFoundError(f"Default configuration file not found: {default_config_path}. This is required.") |
|
benchmark_config = BenchmarkConfig.from_json(default_config_path) |
|
print(f"Loaded default configuration from {default_config_path}") |
|
|
|
if os.path.exists(experiment_benchmark_params_path): |
|
with open(experiment_benchmark_params_path) as f: |
|
experiment_specific_params = json.load(f) |
|
|
|
benchmark_config.merge_from_dict(experiment_specific_params) |
|
print(f"Loaded and merged experiment-specific parameters from {experiment_benchmark_params_path}") |
|
else: |
|
print(f"No {FILE_NAME_BENCHMARK_PARAMS} found in {path}. Using only default configuration.") |
|
|
|
return experiment_name, benchmark_config |
|
|
|
|
|
def get_memory_usage() -> tuple[float, float, float]: |
|
"""Get current memory usage (RAM and accelerator).""" |
|
process = psutil.Process(os.getpid()) |
|
ram_usage_bytes = process.memory_info().rss |
|
ram_usage_mb = ram_usage_bytes / (1024 * 1024) |
|
|
|
if torch.cuda.is_available(): |
|
accelerator_allocated = torch.cuda.memory_allocated() |
|
accelerator_reserved = torch.cuda.memory_reserved() |
|
accelerator_allocated_mb = accelerator_allocated / (1024 * 1024) |
|
accelerator_reserved_mb = accelerator_reserved / (1024 * 1024) |
|
elif torch.xpu.is_available(): |
|
accelerator_allocated = torch.xpu.memory_allocated() |
|
accelerator_reserved = torch.xpu.memory_reserved() |
|
accelerator_allocated_mb = accelerator_allocated / (1024 * 1024) |
|
accelerator_reserved_mb = accelerator_reserved / (1024 * 1024) |
|
else: |
|
accelerator_allocated_mb = 0.0 |
|
accelerator_reserved_mb = 0.0 |
|
|
|
return ram_usage_mb, accelerator_allocated_mb, accelerator_reserved_mb |
|
|
|
|
|
def init_accelerator() -> tuple[float, float]: |
|
"""Initialize accelerator and return initial memory usage.""" |
|
if torch.cuda.is_available(): |
|
torch.cuda.init() |
|
torch.cuda.empty_cache() |
|
_, accelerator_allocated, accelerator_reserved = get_memory_usage() |
|
elif torch.xpu.is_available(): |
|
torch.xpu.init() |
|
torch.xpu.empty_cache() |
|
_, accelerator_allocated, accelerator_reserved = get_memory_usage() |
|
else: |
|
accelerator_allocated = 0.0 |
|
accelerator_reserved = 0.0 |
|
return accelerator_allocated, accelerator_reserved |
|
|
|
|
|
def get_model_size_mb(model: torch.nn.Module, dtype_bytes: int = 4) -> float: |
|
"""Calculate model size in MB.""" |
|
return sum(p.numel() * dtype_bytes for p in model.parameters()) / (1024 * 1024) |
|
|
|
|
|
def get_peft_branch() -> str: |
|
repo_root = os.path.dirname(__file__) |
|
return subprocess.check_output("git rev-parse --abbrev-ref HEAD".split(), cwd=repo_root).decode().strip() |
|
|
|
|
|
def log_results( |
|
experiment_name: str, |
|
benchmark_result: BenchmarkResult, |
|
print_fn: Callable = print, |
|
) -> None: |
|
"""Log benchmark results to console.""" |
|
print_fn("\n" + "=" * 50) |
|
print_fn(f"Benchmark Results: {experiment_name}") |
|
print_fn("=" * 50) |
|
|
|
print_fn(f"Status: {benchmark_result.run_info.get('status', 'N/A')}") |
|
print_fn(f"Duration: {benchmark_result.run_info.get('duration', 0):.2f} seconds") |
|
|
|
if benchmark_result.run_info.get("status") != BenchmarkStatus.SUCCESS.value: |
|
print_fn(f"Error: {benchmark_result.run_info.get('error', 'Unknown error')}") |
|
print_fn("=" * 50) |
|
return |
|
|
|
print_fn("\nModel Information:") |
|
print_fn(f" Base Model: {benchmark_result.meta_info.get('model_id', 'N/A')}") |
|
|
|
print_fn("\nParameter Counts:") |
|
params = benchmark_result.meta_info.get("parameters", {}) |
|
print_fn(f" Base Parameters: {params.get('base_params', 0):,}") |
|
print_fn(f" Trainable Parameters: {params.get('trainable_params', 0):,}") |
|
print_fn(f" Parameter Ratio: {params.get('param_ratio', 0):.5%}") |
|
|
|
print_fn("\nModel Size:") |
|
size_info = benchmark_result.meta_info.get("model_size", {}) |
|
print_fn(f" Base Model: {size_info.get('base_model_size_mb', 0):.2f} MB") |
|
print_fn(f" Adapter: {size_info.get('adapter_size_mb', 0):.2f} MB") |
|
|
|
print_fn("\nMemory Usage (from generation_info):") |
|
memory_data = benchmark_result.generation_info.get("memory", {}) |
|
print_fn(f" Peak Accelerator Memory: {memory_data.get('peak_accelerator_memory_mb', 0):.2f} MB") |
|
print_fn(f" Peak RAM Memory: {memory_data.get('peak_ram_memory_mb', 0):.2f} MB") |
|
|
|
print_fn("\nDetailed Metrics (from generation_info.by_category):") |
|
if benchmark_result.generation_info.get("by_category"): |
|
for category, cat_data in benchmark_result.generation_info["by_category"].items(): |
|
print_fn(f" Category: {category}") |
|
metrics = cat_data.get("metrics", {}) |
|
print_fn(f" Inference Time: {metrics.get('inference_time', 0):.4f} seconds") |
|
print_fn(f" Base Inference Time: {metrics.get('base_inference_time', 0):.4f} seconds") |
|
print_fn(f" Inference Overhead: {metrics.get('inference_overhead_pct', 0):.2f}%") |
|
print_fn(f" Time Per Token: {metrics.get('time_per_token', 0):.6f} seconds/token") |
|
print_fn(f" Generated Tokens: {metrics.get('generated_tokens', 0):.1f}") |
|
|
|
samples = cat_data.get("samples", []) |
|
if samples: |
|
print_fn(f" Number of Samples: {len(samples)}") |
|
print_fn( |
|
f" Average Generated Tokens: {sum(s.get('generated_tokens', 0) for s in samples) / len(samples):.1f}" |
|
) |
|
else: |
|
print_fn(" No per-category metrics available.") |
|
|
|
benchmark_result.compute_overall_metrics() |
|
|
|
print_fn("\nOverall Metrics (from generation_info.overall):") |
|
overall = benchmark_result.generation_info.get("overall") |
|
if overall: |
|
print_fn(f" Inference Time: {overall.get('inference_time', 0):.4f} seconds") |
|
print_fn(f" Base Inference Time: {overall.get('base_inference_time', 0):.4f} seconds") |
|
print_fn(f" Inference Overhead: {overall.get('inference_overhead_pct', 0):.2f}%") |
|
print_fn(f" Time Per Token: {overall.get('time_per_token', 0):.6f} seconds/token") |
|
print_fn(f" Generated Tokens: {overall.get('generated_tokens', 0):.1f}") |
|
else: |
|
print_fn(" No overall metrics computed.") |
|
|
|
print_fn("\nSaved results to:", benchmark_result.save()) |
|
print_fn("=" * 50) |
|
|