#!/usr/bin/env python3 """ Test MarkovSpline + BitTransformerLM Integration Validates the integration between MarkovSpline and BitTransformerLM using actual datasets and training procedures. """ import os import sys import time import torch import json import numpy as np from pathlib import Path # Add MarkovSpline to path sys.path.insert(0, '/data/MarkovSpline') from bitpipe_integration import create_markov_spline_bitpipe_module # BitTransformerLM imports from bit_transformer.model import BitTransformerLM from markov_spline_training import MarkovSplineEnhancedTrainer, MarkovSplineEnhancedDataset from markov_spline_cli import MarkovSplineBitTransformerCLI # Create simple dataset function def create_simple_dataset(num_samples=100, seq_length=128): """Create simple dataset for testing.""" dataset = [] for i in range(num_samples): input_bits = torch.randint(0, 2, (seq_length,), dtype=torch.long) target_bits = torch.randint(0, 2, (seq_length,), dtype=torch.long) dataset.append({'input_bits': input_bits, 'target_bits': target_bits}) return dataset # Text to bits converter class class TextToBitsConverter: """Simple text to bits converter for testing.""" def text_to_bits(self, text, max_length=128): """Convert text to bit sequence.""" # Simple encoding: each character to 8 bits bit_sequence = [] for char in text[:max_length//8]: char_bits = format(ord(char), '08b') bit_sequence.extend([int(b) for b in char_bits]) # Pad or truncate to max_length if len(bit_sequence) < max_length: bit_sequence.extend([0] * (max_length - len(bit_sequence))) else: bit_sequence = bit_sequence[:max_length] return bit_sequence def test_markov_spline_preprocessing(): """Test MarkovSpline data preprocessing functionality.""" print("\n๐Ÿงช Testing MarkovSpline Data Preprocessing") print("=" * 50) # Initialize MarkovSpline module markov_module = create_markov_spline_bitpipe_module() # Create test bit sequences converter = TextToBitsConverter() test_texts = [ "The quick brown fox jumps over the lazy dog.", "Machine learning transforms raw data into insights.", "BitTransformerLM processes information at the bit level.", "MarkovSpline provides smooth data transitions." ] bit_sequences = [] for text in test_texts: bits = converter.text_to_bits(text, max_length=64) bit_sequences.append(bits) print(f"๐Ÿ“ Created {len(bit_sequences)} test bit sequences") # Test binary sequence prediction print("\n๐Ÿ”ฎ Testing Binary Sequence Prediction") result = markov_module.process_data(bit_sequences[0], 'predict_binary', num_predictions=8) if result['success']: print(f" โœ… Prediction successful") print(f" ๐ŸŽฏ Predictions: {result['predictions']}") print(f" ๐Ÿ“Š Avg confidence: {result['prediction_metrics']['avg_confidence']:.3f}") print(f" ๐Ÿ“ˆ Entropy: {result['prediction_metrics']['prediction_entropy']:.3f}") else: print(f" โŒ Prediction failed: {result.get('error', 'Unknown error')}") return False # Test data preprocessing print("\n๐Ÿ”„ Testing Data Preprocessing") result = markov_module.process_data(bit_sequences, 'preprocess_training', binary_data=True) if result['success']: print(f" โœ… Preprocessing successful") print(f" ๐Ÿ“ฆ Processed {len(result['processed_sequences'])} sequences") print(f" ๐Ÿ“‹ Summary: {result['preprocessing_summary']}") else: print(f" โŒ Preprocessing failed: {result.get('error', 'Unknown error')}") return False return True def test_enhanced_dataset(): """Test MarkovSpline enhanced dataset wrapper.""" print("\n๐Ÿ“ฆ Testing Enhanced Dataset Wrapper") print("=" * 50) # Create base dataset base_dataset = create_simple_dataset(num_samples=20, seq_length=32) print(f"๐Ÿ“ Created base dataset with {len(base_dataset)} samples") # Initialize MarkovSpline module markov_module = create_markov_spline_bitpipe_module() # Create enhanced dataset enhanced_dataset = MarkovSplineEnhancedDataset( base_dataset, markov_module, smoothing_strength=0.15, enable_smoothing=True ) print(f"๐ŸŒŠ Created enhanced dataset with MarkovSpline preprocessing") # Test data loading test_samples = [] smoothing_success_count = 0 for i in range(min(5, len(enhanced_dataset))): sample = enhanced_dataset[i] test_samples.append(sample) if sample.get('smoothing_applied', False): smoothing_success_count += 1 print(f" Sample {i}: smoothing_applied = {sample.get('smoothing_applied', False)}") success_rate = smoothing_success_count / len(test_samples) if test_samples else 0 print(f"โœ… Smoothing success rate: {success_rate:.2%}") return success_rate > 0.5 def test_gradient_smoothing(): """Test gradient smoothing functionality.""" print("\nโšก Testing Gradient Smoothing") print("=" * 50) # Create small test model model = BitTransformerLM( d_model=32, nhead=2, num_layers=2, dim_feedforward=64, max_seq_len=128 ) # Create test data batch_size = 4 seq_length = 64 test_batch = { 'input_bits': torch.randint(0, 2, (batch_size, seq_length), dtype=torch.long), 'target_bits': torch.randint(0, 2, (batch_size, seq_length), dtype=torch.long) } # Initialize enhanced trainer trainer = MarkovSplineEnhancedTrainer( model=model, gradient_smoothing=True, data_smoothing=False, smoothing_strength=0.2, learning_rate=1e-3 ) print(f"๐Ÿง  Initialized enhanced trainer") # Test training step with gradient smoothing start_time = time.time() metrics = trainer.train_step(test_batch) end_time = time.time() print(f" โœ… Training step completed in {end_time - start_time:.3f}s") print(f" ๐Ÿ“Š Loss: {metrics['loss']:.4f}") print(f" ๐ŸŒŠ Smoothing applied: {metrics.get('smoothing_applied', 0):.3f}") # Get MarkovSpline metrics markov_metrics = trainer.get_markov_spline_metrics() print(f" ๐Ÿ“ˆ MarkovSpline operations: {markov_metrics['processing_operations']}") return True def test_cli_interface(): """Test command-line interface functionality.""" print("\n๐Ÿ’ป Testing CLI Interface") print("=" * 50) # Initialize CLI cli = MarkovSplineBitTransformerCLI() if not cli.initialize_markov_spline(): print(" โŒ CLI initialization failed") return False print(" โœ… CLI initialized successfully") # Test bit sequence smoothing test_bits = [0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 0, 0, 1] result = cli.smooth_bit_sequence(test_bits, 'predict_binary', num_predictions=5) if result['success']: print(f" โœ… Bit sequence smoothing successful") print(f" ๐ŸŽฏ Predictions: {result['predictions']}") else: print(f" โŒ Bit sequence smoothing failed: {result.get('error', 'Unknown')}") return False return True def run_integration_benchmark(): """Run comprehensive integration benchmark.""" print("\n๐Ÿƒ Running Integration Benchmark") print("=" * 50) # Create test dataset dataset = create_simple_dataset(num_samples=50, seq_length=64) # Initialize MarkovSpline module markov_module = create_markov_spline_bitpipe_module() # Create enhanced dataset enhanced_dataset = MarkovSplineEnhancedDataset( dataset, markov_module, smoothing_strength=0.1, enable_smoothing=True ) # Time data loading with and without smoothing print("\nโฑ๏ธ Benchmarking Data Loading Performance") # Benchmark without smoothing start_time = time.time() for i in range(10): sample = dataset[i % len(dataset)] base_time = time.time() - start_time # Benchmark with smoothing start_time = time.time() for i in range(10): sample = enhanced_dataset[i % len(enhanced_dataset)] enhanced_time = time.time() - start_time overhead = ((enhanced_time - base_time) / base_time) * 100 if base_time > 0 else 0 print(f" ๐Ÿ“Š Base loading time: {base_time:.3f}s") print(f" ๐ŸŒŠ Enhanced loading time: {enhanced_time:.3f}s") print(f" ๐Ÿ“ˆ Smoothing overhead: {overhead:.1f}%") # Test training performance print("\n๐Ÿ‹๏ธ Benchmarking Training Performance") model = BitTransformerLM(d_model=64, nhead=4, num_layers=2, dim_feedforward=128, max_seq_len=128) # Standard trainer (use the one from markov_spline_training) from markov_spline_training import BitwiseTrainer standard_trainer = BitwiseTrainer(model, learning_rate=1e-3) # Enhanced trainer enhanced_trainer = MarkovSplineEnhancedTrainer( model, gradient_smoothing=True, data_smoothing=True, smoothing_strength=0.15, learning_rate=1e-3 ) # Benchmark training step test_batch = { 'input_bits': torch.randint(0, 2, (4, 64), dtype=torch.long), 'target_bits': torch.randint(0, 2, (4, 64), dtype=torch.long) } # Standard training step start_time = time.time() standard_metrics = standard_trainer.train_step(test_batch) standard_time = time.time() - start_time # Enhanced training step start_time = time.time() enhanced_metrics = enhanced_trainer.train_step(test_batch) enhanced_time = time.time() - start_time training_overhead = ((enhanced_time - standard_time) / standard_time) * 100 if standard_time > 0 else 0 print(f" ๐Ÿ“Š Standard training step: {standard_time:.3f}s") print(f" ๐ŸŒŠ Enhanced training step: {enhanced_time:.3f}s") print(f" ๐Ÿ“ˆ Enhancement overhead: {training_overhead:.1f}%") print(f" ๐ŸŽฏ Standard loss: {standard_metrics['loss']:.4f}") print(f" ๐ŸŽฏ Enhanced loss: {enhanced_metrics['loss']:.4f}") return { 'data_loading_overhead': overhead, 'training_overhead': training_overhead, 'standard_loss': standard_metrics['loss'], 'enhanced_loss': enhanced_metrics['loss'] } def main(): """Run comprehensive MarkovSpline integration tests.""" print("๐ŸŒŠ MarkovSpline + BitTransformerLM Integration Tests") print("=" * 60) results = { 'preprocessing_test': False, 'enhanced_dataset_test': False, 'gradient_smoothing_test': False, 'cli_interface_test': False, 'benchmark_results': None } try: # Run individual tests results['preprocessing_test'] = test_markov_spline_preprocessing() results['enhanced_dataset_test'] = test_enhanced_dataset() results['gradient_smoothing_test'] = test_gradient_smoothing() results['cli_interface_test'] = test_cli_interface() # Run benchmark results['benchmark_results'] = run_integration_benchmark() # Summary print("\n๐Ÿ“‹ Test Results Summary") print("=" * 60) passed_tests = 0 total_tests = 4 # Don't count benchmark as pass/fail for test_name, result in results.items(): if test_name != 'benchmark_results': status = "โœ… PASSED" if result else "โŒ FAILED" print(f" {test_name}: {status}") if result: passed_tests += 1 success_rate = (passed_tests / total_tests) * 100 print(f"\n๐ŸŽฏ Overall Success Rate: {success_rate:.1f}% ({passed_tests}/{total_tests})") if results['benchmark_results']: benchmark = results['benchmark_results'] print(f"\n๐Ÿ“Š Performance Impact:") print(f" - Data loading overhead: {benchmark['data_loading_overhead']:.1f}%") print(f" - Training overhead: {benchmark['training_overhead']:.1f}%") print(f" - Loss comparison: {benchmark['standard_loss']:.4f} โ†’ {benchmark['enhanced_loss']:.4f}") # Save results results_file = '/data/BitTransformerLM/BitTransformerLM/markov_integration_test_results.json' with open(results_file, 'w') as f: json.dump(results, f, indent=2, default=str) print(f"\n๐Ÿ“ Results saved to: {results_file}") if success_rate >= 75: print("\n๐Ÿš€ Integration tests PASSED! Ready for production use.") return 0 else: print("\nโš ๏ธ Integration tests show issues. Review before production use.") return 1 except Exception as e: print(f"\n๐Ÿ’ฅ Test suite failed with error: {e}") import traceback traceback.print_exc() return 1 if __name__ == '__main__': sys.exit(main())