BitTransformerLM / test_markov_integration.py
WCNegentropy's picture
🚀 OS Launch: Clean documentation and refined licensing
0972388 verified
raw
history blame
13.2 kB
#!/usr/bin/env python3
"""
Test MarkovSpline + BitTransformerLM Integration
Validates the integration between MarkovSpline and BitTransformerLM
using actual datasets and training procedures.
"""
import os
import sys
import time
import torch
import json
import numpy as np
from pathlib import Path
# Add MarkovSpline to path
sys.path.insert(0, '/data/MarkovSpline')
from bitpipe_integration import create_markov_spline_bitpipe_module
# BitTransformerLM imports
from bit_transformer.model import BitTransformerLM
from markov_spline_training import MarkovSplineEnhancedTrainer, MarkovSplineEnhancedDataset
from markov_spline_cli import MarkovSplineBitTransformerCLI
# Create simple dataset function
def create_simple_dataset(num_samples=100, seq_length=128):
"""Create simple dataset for testing."""
dataset = []
for i in range(num_samples):
input_bits = torch.randint(0, 2, (seq_length,), dtype=torch.long)
target_bits = torch.randint(0, 2, (seq_length,), dtype=torch.long)
dataset.append({'input_bits': input_bits, 'target_bits': target_bits})
return dataset
# Text to bits converter class
class TextToBitsConverter:
"""Simple text to bits converter for testing."""
def text_to_bits(self, text, max_length=128):
"""Convert text to bit sequence."""
# Simple encoding: each character to 8 bits
bit_sequence = []
for char in text[:max_length//8]:
char_bits = format(ord(char), '08b')
bit_sequence.extend([int(b) for b in char_bits])
# Pad or truncate to max_length
if len(bit_sequence) < max_length:
bit_sequence.extend([0] * (max_length - len(bit_sequence)))
else:
bit_sequence = bit_sequence[:max_length]
return bit_sequence
def test_markov_spline_preprocessing():
"""Test MarkovSpline data preprocessing functionality."""
print("\n🧪 Testing MarkovSpline Data Preprocessing")
print("=" * 50)
# Initialize MarkovSpline module
markov_module = create_markov_spline_bitpipe_module()
# Create test bit sequences
converter = TextToBitsConverter()
test_texts = [
"The quick brown fox jumps over the lazy dog.",
"Machine learning transforms raw data into insights.",
"BitTransformerLM processes information at the bit level.",
"MarkovSpline provides smooth data transitions."
]
bit_sequences = []
for text in test_texts:
bits = converter.text_to_bits(text, max_length=64)
bit_sequences.append(bits)
print(f"📝 Created {len(bit_sequences)} test bit sequences")
# Test binary sequence prediction
print("\n🔮 Testing Binary Sequence Prediction")
result = markov_module.process_data(bit_sequences[0], 'predict_binary', num_predictions=8)
if result['success']:
print(f" ✅ Prediction successful")
print(f" 🎯 Predictions: {result['predictions']}")
print(f" 📊 Avg confidence: {result['prediction_metrics']['avg_confidence']:.3f}")
print(f" 📈 Entropy: {result['prediction_metrics']['prediction_entropy']:.3f}")
else:
print(f" ❌ Prediction failed: {result.get('error', 'Unknown error')}")
return False
# Test data preprocessing
print("\n🔄 Testing Data Preprocessing")
result = markov_module.process_data(bit_sequences, 'preprocess_training', binary_data=True)
if result['success']:
print(f" ✅ Preprocessing successful")
print(f" 📦 Processed {len(result['processed_sequences'])} sequences")
print(f" 📋 Summary: {result['preprocessing_summary']}")
else:
print(f" ❌ Preprocessing failed: {result.get('error', 'Unknown error')}")
return False
return True
def test_enhanced_dataset():
"""Test MarkovSpline enhanced dataset wrapper."""
print("\n📦 Testing Enhanced Dataset Wrapper")
print("=" * 50)
# Create base dataset
base_dataset = create_simple_dataset(num_samples=20, seq_length=32)
print(f"📝 Created base dataset with {len(base_dataset)} samples")
# Initialize MarkovSpline module
markov_module = create_markov_spline_bitpipe_module()
# Create enhanced dataset
enhanced_dataset = MarkovSplineEnhancedDataset(
base_dataset, markov_module, smoothing_strength=0.15, enable_smoothing=True
)
print(f"🌊 Created enhanced dataset with MarkovSpline preprocessing")
# Test data loading
test_samples = []
smoothing_success_count = 0
for i in range(min(5, len(enhanced_dataset))):
sample = enhanced_dataset[i]
test_samples.append(sample)
if sample.get('smoothing_applied', False):
smoothing_success_count += 1
print(f" Sample {i}: smoothing_applied = {sample.get('smoothing_applied', False)}")
success_rate = smoothing_success_count / len(test_samples) if test_samples else 0
print(f"✅ Smoothing success rate: {success_rate:.2%}")
return success_rate > 0.5
def test_gradient_smoothing():
"""Test gradient smoothing functionality."""
print("\n⚡ Testing Gradient Smoothing")
print("=" * 50)
# Create small test model
model = BitTransformerLM(
d_model=32, nhead=2, num_layers=2,
dim_feedforward=64, max_seq_len=128
)
# Create test data
batch_size = 4
seq_length = 64
test_batch = {
'input_bits': torch.randint(0, 2, (batch_size, seq_length), dtype=torch.long),
'target_bits': torch.randint(0, 2, (batch_size, seq_length), dtype=torch.long)
}
# Initialize enhanced trainer
trainer = MarkovSplineEnhancedTrainer(
model=model,
gradient_smoothing=True,
data_smoothing=False,
smoothing_strength=0.2,
learning_rate=1e-3
)
print(f"🧠 Initialized enhanced trainer")
# Test training step with gradient smoothing
start_time = time.time()
metrics = trainer.train_step(test_batch)
end_time = time.time()
print(f" ✅ Training step completed in {end_time - start_time:.3f}s")
print(f" 📊 Loss: {metrics['loss']:.4f}")
print(f" 🌊 Smoothing applied: {metrics.get('smoothing_applied', 0):.3f}")
# Get MarkovSpline metrics
markov_metrics = trainer.get_markov_spline_metrics()
print(f" 📈 MarkovSpline operations: {markov_metrics['processing_operations']}")
return True
def test_cli_interface():
"""Test command-line interface functionality."""
print("\n💻 Testing CLI Interface")
print("=" * 50)
# Initialize CLI
cli = MarkovSplineBitTransformerCLI()
if not cli.initialize_markov_spline():
print(" ❌ CLI initialization failed")
return False
print(" ✅ CLI initialized successfully")
# Test bit sequence smoothing
test_bits = [0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 0, 0, 1]
result = cli.smooth_bit_sequence(test_bits, 'predict_binary', num_predictions=5)
if result['success']:
print(f" ✅ Bit sequence smoothing successful")
print(f" 🎯 Predictions: {result['predictions']}")
else:
print(f" ❌ Bit sequence smoothing failed: {result.get('error', 'Unknown')}")
return False
return True
def run_integration_benchmark():
"""Run comprehensive integration benchmark."""
print("\n🏃 Running Integration Benchmark")
print("=" * 50)
# Create test dataset
dataset = create_simple_dataset(num_samples=50, seq_length=64)
# Initialize MarkovSpline module
markov_module = create_markov_spline_bitpipe_module()
# Create enhanced dataset
enhanced_dataset = MarkovSplineEnhancedDataset(
dataset, markov_module, smoothing_strength=0.1, enable_smoothing=True
)
# Time data loading with and without smoothing
print("\n⏱️ Benchmarking Data Loading Performance")
# Benchmark without smoothing
start_time = time.time()
for i in range(10):
sample = dataset[i % len(dataset)]
base_time = time.time() - start_time
# Benchmark with smoothing
start_time = time.time()
for i in range(10):
sample = enhanced_dataset[i % len(enhanced_dataset)]
enhanced_time = time.time() - start_time
overhead = ((enhanced_time - base_time) / base_time) * 100 if base_time > 0 else 0
print(f" 📊 Base loading time: {base_time:.3f}s")
print(f" 🌊 Enhanced loading time: {enhanced_time:.3f}s")
print(f" 📈 Smoothing overhead: {overhead:.1f}%")
# Test training performance
print("\n🏋️ Benchmarking Training Performance")
model = BitTransformerLM(d_model=64, nhead=4, num_layers=2, dim_feedforward=128, max_seq_len=128)
# Standard trainer (use the one from markov_spline_training)
from markov_spline_training import BitwiseTrainer
standard_trainer = BitwiseTrainer(model, learning_rate=1e-3)
# Enhanced trainer
enhanced_trainer = MarkovSplineEnhancedTrainer(
model, gradient_smoothing=True, data_smoothing=True,
smoothing_strength=0.15, learning_rate=1e-3
)
# Benchmark training step
test_batch = {
'input_bits': torch.randint(0, 2, (4, 64), dtype=torch.long),
'target_bits': torch.randint(0, 2, (4, 64), dtype=torch.long)
}
# Standard training step
start_time = time.time()
standard_metrics = standard_trainer.train_step(test_batch)
standard_time = time.time() - start_time
# Enhanced training step
start_time = time.time()
enhanced_metrics = enhanced_trainer.train_step(test_batch)
enhanced_time = time.time() - start_time
training_overhead = ((enhanced_time - standard_time) / standard_time) * 100 if standard_time > 0 else 0
print(f" 📊 Standard training step: {standard_time:.3f}s")
print(f" 🌊 Enhanced training step: {enhanced_time:.3f}s")
print(f" 📈 Enhancement overhead: {training_overhead:.1f}%")
print(f" 🎯 Standard loss: {standard_metrics['loss']:.4f}")
print(f" 🎯 Enhanced loss: {enhanced_metrics['loss']:.4f}")
return {
'data_loading_overhead': overhead,
'training_overhead': training_overhead,
'standard_loss': standard_metrics['loss'],
'enhanced_loss': enhanced_metrics['loss']
}
def main():
"""Run comprehensive MarkovSpline integration tests."""
print("🌊 MarkovSpline + BitTransformerLM Integration Tests")
print("=" * 60)
results = {
'preprocessing_test': False,
'enhanced_dataset_test': False,
'gradient_smoothing_test': False,
'cli_interface_test': False,
'benchmark_results': None
}
try:
# Run individual tests
results['preprocessing_test'] = test_markov_spline_preprocessing()
results['enhanced_dataset_test'] = test_enhanced_dataset()
results['gradient_smoothing_test'] = test_gradient_smoothing()
results['cli_interface_test'] = test_cli_interface()
# Run benchmark
results['benchmark_results'] = run_integration_benchmark()
# Summary
print("\n📋 Test Results Summary")
print("=" * 60)
passed_tests = 0
total_tests = 4 # Don't count benchmark as pass/fail
for test_name, result in results.items():
if test_name != 'benchmark_results':
status = "✅ PASSED" if result else "❌ FAILED"
print(f" {test_name}: {status}")
if result:
passed_tests += 1
success_rate = (passed_tests / total_tests) * 100
print(f"\n🎯 Overall Success Rate: {success_rate:.1f}% ({passed_tests}/{total_tests})")
if results['benchmark_results']:
benchmark = results['benchmark_results']
print(f"\n📊 Performance Impact:")
print(f" - Data loading overhead: {benchmark['data_loading_overhead']:.1f}%")
print(f" - Training overhead: {benchmark['training_overhead']:.1f}%")
print(f" - Loss comparison: {benchmark['standard_loss']:.4f}{benchmark['enhanced_loss']:.4f}")
# Save results
results_file = '/data/BitTransformerLM/BitTransformerLM/markov_integration_test_results.json'
with open(results_file, 'w') as f:
json.dump(results, f, indent=2, default=str)
print(f"\n📁 Results saved to: {results_file}")
if success_rate >= 75:
print("\n🚀 Integration tests PASSED! Ready for production use.")
return 0
else:
print("\n⚠️ Integration tests show issues. Review before production use.")
return 1
except Exception as e:
print(f"\n💥 Test suite failed with error: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == '__main__':
sys.exit(main())