|  | """ | 
					
						
						|  | BitTransformerLM Dataset Builder & HuggingFace Integration | 
					
						
						|  |  | 
					
						
						|  | Creates curated datasets optimized for bit-native transformer training with | 
					
						
						|  | comprehensive safety benchmarks, scaling curricula, and progressive complexity. | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | import os | 
					
						
						|  | import json | 
					
						
						|  | import gzip | 
					
						
						|  | import random | 
					
						
						|  | from typing import List, Dict, Any, Optional, Tuple | 
					
						
						|  | from pathlib import Path | 
					
						
						|  | from datetime import datetime | 
					
						
						|  | import tempfile | 
					
						
						|  |  | 
					
						
						|  | import torch | 
					
						
						|  | import numpy as np | 
					
						
						|  | from datasets import Dataset, DatasetDict | 
					
						
						|  | from huggingface_hub import HfApi, login, create_repo | 
					
						
						|  |  | 
					
						
						|  | from .bit_io import text_to_bits, bits_to_text | 
					
						
						|  | from .parity import enforce_parity as _enforce_parity_tensor | 
					
						
						|  | from .compression import compress_bits | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def compute_negentropy(bit_tensor: torch.Tensor) -> float: | 
					
						
						|  | """Compute negentropy (departure from randomness) of bit sequence.""" | 
					
						
						|  | if len(bit_tensor) == 0: | 
					
						
						|  | return 0.0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | p_1 = bit_tensor.float().mean() | 
					
						
						|  | p_0 = 1.0 - p_1 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | p_1 = torch.clamp(p_1, min=1e-7, max=1.0-1e-7) | 
					
						
						|  | p_0 = torch.clamp(p_0, min=1e-7, max=1.0-1e-7) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | entropy = -(p_1 * torch.log2(p_1) + p_0 * torch.log2(p_0)) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | max_entropy = 1.0 | 
					
						
						|  | negentropy = (max_entropy - entropy) / max_entropy | 
					
						
						|  |  | 
					
						
						|  | return float(negentropy) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def compute_lz_complexity(bits: List[int]) -> float: | 
					
						
						|  | """Compute approximation of Lempel-Ziv complexity.""" | 
					
						
						|  | if not bits: | 
					
						
						|  | return 0.0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | runs = [] | 
					
						
						|  | if bits: | 
					
						
						|  | current_run = 1 | 
					
						
						|  | for i in range(1, len(bits)): | 
					
						
						|  | if bits[i] == bits[i-1]: | 
					
						
						|  | current_run += 1 | 
					
						
						|  | else: | 
					
						
						|  | runs.append(current_run) | 
					
						
						|  | current_run = 1 | 
					
						
						|  | runs.append(current_run) | 
					
						
						|  |  | 
					
						
						|  | if not runs: | 
					
						
						|  | return 0.0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | complexity = len(runs) / len(bits) | 
					
						
						|  | return min(1.0, complexity * 2) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def compute_symbiosis(bit_tensor1: torch.Tensor, bit_tensor2: torch.Tensor) -> float: | 
					
						
						|  | """Compute symbiosis score between two bit sequences.""" | 
					
						
						|  | if len(bit_tensor1) != len(bit_tensor2) or len(bit_tensor1) == 0: | 
					
						
						|  | return 0.0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | corr = torch.corrcoef(torch.stack([bit_tensor1.float(), bit_tensor2.float()]))[0, 1] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if torch.isnan(corr): | 
					
						
						|  | return 0.0 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | symbiosis = (corr + 1) / 2 | 
					
						
						|  | return float(symbiosis) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def enforce_parity(bits: List[int]) -> List[int]: | 
					
						
						|  | """Simple parity wrapper for lists.""" | 
					
						
						|  | if not bits: | 
					
						
						|  | return bits | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | while len(bits) % 9 != 0: | 
					
						
						|  | bits.append(0) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | bits_tensor = torch.tensor(bits, dtype=torch.long) | 
					
						
						|  | corrected_tensor, _ = _enforce_parity_tensor(bits_tensor) | 
					
						
						|  | return corrected_tensor.tolist() | 
					
						
						|  | except: | 
					
						
						|  |  | 
					
						
						|  | return bits | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class BitTransformerDatasetBuilder: | 
					
						
						|  | """ | 
					
						
						|  | Comprehensive dataset builder for BitTransformerLM training. | 
					
						
						|  |  | 
					
						
						|  | Generates: | 
					
						
						|  | - Binary sequences with parity protection | 
					
						
						|  | - Progressive complexity curricula | 
					
						
						|  | - Safety benchmark validation sets | 
					
						
						|  | - Synthetic bit patterns for robustness | 
					
						
						|  | - Compressed sequence variants | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | def __init__(self, hf_token: str, repo_id: str = "BitTransformerLM"): | 
					
						
						|  | """Initialize with HuggingFace credentials.""" | 
					
						
						|  | self.hf_token = hf_token | 
					
						
						|  | self.repo_id = repo_id | 
					
						
						|  | self.api = HfApi() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | login(token=hf_token) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | self.config = { | 
					
						
						|  | "version": "1.0.0", | 
					
						
						|  | "created": datetime.now().isoformat(), | 
					
						
						|  | "model_compatibility": "BitTransformerLM", | 
					
						
						|  | "bit_encoding": "parity_protected", | 
					
						
						|  | "max_sequence_length": 512, | 
					
						
						|  | "total_samples": 50000, | 
					
						
						|  | "safety_thresholds": { | 
					
						
						|  | "min_negentropy": 0.1, | 
					
						
						|  | "max_lz_complexity": 0.9, | 
					
						
						|  | "min_symbiosis": 0.3 | 
					
						
						|  | } | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | def generate_text_to_bits_data(self, texts: List[str], max_len: int = 512) -> List[Dict]: | 
					
						
						|  | """Convert text samples to parity-protected bit sequences.""" | 
					
						
						|  | samples = [] | 
					
						
						|  |  | 
					
						
						|  | for i, text in enumerate(texts): | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | bits = text_to_bits(text)[:max_len] | 
					
						
						|  | bits = enforce_parity(bits) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if len(bits) < max_len: | 
					
						
						|  | bits.extend([0] * (max_len - len(bits))) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | bit_tensor = torch.tensor(bits, dtype=torch.float32) | 
					
						
						|  | negentropy = compute_negentropy(bit_tensor) | 
					
						
						|  | lz_complexity = compute_lz_complexity(bits) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | sample = { | 
					
						
						|  | "id": f"text_to_bits_{i:06d}", | 
					
						
						|  | "original_text": text[:100] + "..." if len(text) > 100 else text, | 
					
						
						|  | "bit_sequence": bits, | 
					
						
						|  | "sequence_length": len([b for b in bits if b != 0]), | 
					
						
						|  | "negentropy": float(negentropy), | 
					
						
						|  | "lz_complexity": float(lz_complexity), | 
					
						
						|  | "has_parity": True, | 
					
						
						|  | "category": "text_conversion", | 
					
						
						|  |  | 
					
						
						|  | "pattern_type": None, | 
					
						
						|  | "safety_category": None, | 
					
						
						|  | "target_negentropy": None, | 
					
						
						|  | "target_complexity": None, | 
					
						
						|  | "original_id": None, | 
					
						
						|  | "compression_ratio": None, | 
					
						
						|  | "original_length": None | 
					
						
						|  | } | 
					
						
						|  | samples.append(sample) | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print(f"Error processing text {i}: {e}") | 
					
						
						|  | continue | 
					
						
						|  |  | 
					
						
						|  | return samples | 
					
						
						|  |  | 
					
						
						|  | def generate_synthetic_patterns(self, num_samples: int = 5000, max_len: int = 512) -> List[Dict]: | 
					
						
						|  | """Generate synthetic bit patterns for robustness testing.""" | 
					
						
						|  | samples = [] | 
					
						
						|  |  | 
					
						
						|  | patterns = [ | 
					
						
						|  | "alternating", | 
					
						
						|  | "blocks", | 
					
						
						|  | "fibonacci", | 
					
						
						|  | "prime_based", | 
					
						
						|  | "random_walk", | 
					
						
						|  | "spiral", | 
					
						
						|  | "fractal", | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  | for i in range(num_samples): | 
					
						
						|  | pattern_type = random.choice(patterns) | 
					
						
						|  | bits = self._generate_pattern(pattern_type, max_len) | 
					
						
						|  | bits = enforce_parity(bits) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | bit_tensor = torch.tensor(bits, dtype=torch.float32) | 
					
						
						|  | negentropy = compute_negentropy(bit_tensor) | 
					
						
						|  | lz_complexity = compute_lz_complexity(bits) | 
					
						
						|  |  | 
					
						
						|  | sample = { | 
					
						
						|  | "id": f"synthetic_{pattern_type}_{i:06d}", | 
					
						
						|  | "bit_sequence": bits, | 
					
						
						|  | "sequence_length": len([b for b in bits if b != 0]), | 
					
						
						|  | "negentropy": float(negentropy), | 
					
						
						|  | "lz_complexity": float(lz_complexity), | 
					
						
						|  | "pattern_type": pattern_type, | 
					
						
						|  | "has_parity": True, | 
					
						
						|  | "category": "synthetic_pattern", | 
					
						
						|  |  | 
					
						
						|  | "original_text": None, | 
					
						
						|  | "safety_category": None, | 
					
						
						|  | "target_negentropy": None, | 
					
						
						|  | "target_complexity": None, | 
					
						
						|  | "original_id": None, | 
					
						
						|  | "compression_ratio": None, | 
					
						
						|  | "original_length": None | 
					
						
						|  | } | 
					
						
						|  | samples.append(sample) | 
					
						
						|  |  | 
					
						
						|  | return samples | 
					
						
						|  |  | 
					
						
						|  | def generate_safety_benchmarks(self, num_samples: int = 2000) -> List[Dict]: | 
					
						
						|  | """Generate sequences specifically for safety metric validation.""" | 
					
						
						|  | samples = [] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | safety_targets = [ | 
					
						
						|  | ("low_entropy", {"target_negentropy": 0.05, "target_complexity": 0.2}), | 
					
						
						|  | ("medium_entropy", {"target_negentropy": 0.5, "target_complexity": 0.5}), | 
					
						
						|  | ("high_entropy", {"target_negentropy": 0.95, "target_complexity": 0.8}), | 
					
						
						|  | ("edge_cases", {"target_negentropy": 0.99, "target_complexity": 0.99}), | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  | samples_per_target = num_samples // len(safety_targets) | 
					
						
						|  |  | 
					
						
						|  | for safety_type, targets in safety_targets: | 
					
						
						|  | for i in range(samples_per_target): | 
					
						
						|  | bits = self._generate_safety_controlled_sequence( | 
					
						
						|  | targets["target_negentropy"], | 
					
						
						|  | targets["target_complexity"] | 
					
						
						|  | ) | 
					
						
						|  | bits = enforce_parity(bits) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | bit_tensor = torch.tensor(bits, dtype=torch.float32) | 
					
						
						|  | actual_negentropy = compute_negentropy(bit_tensor) | 
					
						
						|  | actual_complexity = compute_lz_complexity(bits) | 
					
						
						|  |  | 
					
						
						|  | sample = { | 
					
						
						|  | "id": f"safety_{safety_type}_{i:06d}", | 
					
						
						|  | "bit_sequence": bits, | 
					
						
						|  | "sequence_length": len(bits), | 
					
						
						|  | "negentropy": float(actual_negentropy), | 
					
						
						|  | "lz_complexity": float(actual_complexity), | 
					
						
						|  | "target_negentropy": targets["target_negentropy"], | 
					
						
						|  | "target_complexity": targets["target_complexity"], | 
					
						
						|  | "safety_category": safety_type, | 
					
						
						|  | "has_parity": True, | 
					
						
						|  | "category": "safety_benchmark", | 
					
						
						|  |  | 
					
						
						|  | "original_text": None, | 
					
						
						|  | "pattern_type": None, | 
					
						
						|  | "original_id": None, | 
					
						
						|  | "compression_ratio": None, | 
					
						
						|  | "original_length": None | 
					
						
						|  | } | 
					
						
						|  | samples.append(sample) | 
					
						
						|  |  | 
					
						
						|  | return samples | 
					
						
						|  |  | 
					
						
						|  | def generate_compression_variants(self, base_samples: List[Dict], | 
					
						
						|  | compression_ratios: List[float] = [0.5, 0.7, 0.9]) -> List[Dict]: | 
					
						
						|  | """Generate compressed variants of base sequences.""" | 
					
						
						|  | compressed_samples = [] | 
					
						
						|  |  | 
					
						
						|  | for ratio in compression_ratios: | 
					
						
						|  | for sample in base_samples[:1000]: | 
					
						
						|  | try: | 
					
						
						|  | original_bits = sample["bit_sequence"] | 
					
						
						|  |  | 
					
						
						|  | bits_tensor = torch.tensor(original_bits, dtype=torch.uint8) | 
					
						
						|  | compressed_tensor = compress_bits(bits_tensor) | 
					
						
						|  | compressed_bits = compressed_tensor.tolist() | 
					
						
						|  | compressed_bits = enforce_parity(compressed_bits) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | bit_tensor = torch.tensor(compressed_bits, dtype=torch.float32) | 
					
						
						|  | negentropy = compute_negentropy(bit_tensor) | 
					
						
						|  | lz_complexity = compute_lz_complexity(compressed_bits) | 
					
						
						|  |  | 
					
						
						|  | compressed_sample = { | 
					
						
						|  | "id": f"{sample['id']}_compressed_{ratio}", | 
					
						
						|  | "original_id": sample["id"], | 
					
						
						|  | "bit_sequence": compressed_bits, | 
					
						
						|  | "sequence_length": len(compressed_bits), | 
					
						
						|  | "negentropy": float(negentropy), | 
					
						
						|  | "lz_complexity": float(lz_complexity), | 
					
						
						|  | "compression_ratio": ratio, | 
					
						
						|  | "original_length": len(original_bits), | 
					
						
						|  | "has_parity": True, | 
					
						
						|  | "category": "compressed_variant", | 
					
						
						|  |  | 
					
						
						|  | "original_text": None, | 
					
						
						|  | "pattern_type": None, | 
					
						
						|  | "safety_category": None, | 
					
						
						|  | "target_negentropy": None, | 
					
						
						|  | "target_complexity": None | 
					
						
						|  | } | 
					
						
						|  | compressed_samples.append(compressed_sample) | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | continue | 
					
						
						|  |  | 
					
						
						|  | return compressed_samples | 
					
						
						|  |  | 
					
						
						|  | def _generate_pattern(self, pattern_type: str, length: int) -> List[int]: | 
					
						
						|  | """Generate specific bit patterns.""" | 
					
						
						|  | if pattern_type == "alternating": | 
					
						
						|  | return [i % 2 for i in range(length)] | 
					
						
						|  |  | 
					
						
						|  | elif pattern_type == "blocks": | 
					
						
						|  | block_size = random.randint(3, 8) | 
					
						
						|  | pattern = [] | 
					
						
						|  | current_bit = 0 | 
					
						
						|  | for i in range(length): | 
					
						
						|  | if i % block_size == 0: | 
					
						
						|  | current_bit = 1 - current_bit | 
					
						
						|  | pattern.append(current_bit) | 
					
						
						|  | return pattern | 
					
						
						|  |  | 
					
						
						|  | elif pattern_type == "fibonacci": | 
					
						
						|  |  | 
					
						
						|  | fib = [0, 1] | 
					
						
						|  | while len(fib) < length: | 
					
						
						|  | fib.append((fib[-1] + fib[-2]) % 2) | 
					
						
						|  | return fib[:length] | 
					
						
						|  |  | 
					
						
						|  | elif pattern_type == "prime_based": | 
					
						
						|  |  | 
					
						
						|  | primes = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31] | 
					
						
						|  | pattern = [] | 
					
						
						|  | for i in range(length): | 
					
						
						|  | is_prime_related = any((i + 1) % p == 0 for p in primes[:5]) | 
					
						
						|  | pattern.append(1 if is_prime_related else 0) | 
					
						
						|  | return pattern | 
					
						
						|  |  | 
					
						
						|  | elif pattern_type == "random_walk": | 
					
						
						|  |  | 
					
						
						|  | pattern = [random.randint(0, 1)] | 
					
						
						|  | for i in range(1, length): | 
					
						
						|  |  | 
					
						
						|  | if random.random() < 0.7: | 
					
						
						|  | pattern.append(pattern[-1]) | 
					
						
						|  | else: | 
					
						
						|  | pattern.append(1 - pattern[-1]) | 
					
						
						|  | return pattern | 
					
						
						|  |  | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | return [random.randint(0, 1) for _ in range(length)] | 
					
						
						|  |  | 
					
						
						|  | def _generate_safety_controlled_sequence(self, target_negentropy: float, | 
					
						
						|  | target_complexity: float, length: int = 256) -> List[int]: | 
					
						
						|  | """Generate bit sequence targeting specific safety metrics.""" | 
					
						
						|  |  | 
					
						
						|  | if target_negentropy < 0.3: | 
					
						
						|  | base_pattern = [0] * (length // 2) + [1] * (length // 2) | 
					
						
						|  | elif target_negentropy > 0.7: | 
					
						
						|  | base_pattern = [random.randint(0, 1) for _ in range(length)] | 
					
						
						|  | else: | 
					
						
						|  | block_size = max(1, int(10 * (1 - target_complexity))) | 
					
						
						|  | base_pattern = [] | 
					
						
						|  | current = 0 | 
					
						
						|  | for i in range(length): | 
					
						
						|  | if i % block_size == 0: | 
					
						
						|  | current = random.randint(0, 1) | 
					
						
						|  | base_pattern.append(current) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | noise_level = max(0.1, target_complexity) | 
					
						
						|  | final_pattern = [] | 
					
						
						|  | for bit in base_pattern: | 
					
						
						|  | if random.random() < noise_level: | 
					
						
						|  | final_pattern.append(1 - bit) | 
					
						
						|  | else: | 
					
						
						|  | final_pattern.append(bit) | 
					
						
						|  |  | 
					
						
						|  | return final_pattern | 
					
						
						|  |  | 
					
						
						|  | def build_complete_dataset(self, source_texts: Optional[List[str]] = None) -> DatasetDict: | 
					
						
						|  | """Build the complete BitTransformerLM dataset.""" | 
					
						
						|  | print("🚀 Building BitTransformerLM Dataset...") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if source_texts is None: | 
					
						
						|  | source_texts = self._get_default_texts() | 
					
						
						|  |  | 
					
						
						|  | all_samples = [] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | print("📝 Generating text-to-bits samples...") | 
					
						
						|  | text_samples = self.generate_text_to_bits_data(source_texts[:10000]) | 
					
						
						|  | all_samples.extend(text_samples) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | print("🎨 Generating synthetic patterns...") | 
					
						
						|  | synthetic_samples = self.generate_synthetic_patterns(7500) | 
					
						
						|  | all_samples.extend(synthetic_samples) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | print("🛡️ Generating safety benchmarks...") | 
					
						
						|  | safety_samples = self.generate_safety_benchmarks(5000) | 
					
						
						|  | all_samples.extend(safety_samples) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | print("🗜️ Generating compression variants...") | 
					
						
						|  | compression_samples = self.generate_compression_variants(text_samples[:1000]) | 
					
						
						|  | all_samples.extend(compression_samples) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | random.shuffle(all_samples) | 
					
						
						|  |  | 
					
						
						|  | total = len(all_samples) | 
					
						
						|  | train_split = int(0.8 * total) | 
					
						
						|  | val_split = int(0.9 * total) | 
					
						
						|  |  | 
					
						
						|  | train_data = all_samples[:train_split] | 
					
						
						|  | val_data = all_samples[train_split:val_split] | 
					
						
						|  | test_data = all_samples[val_split:] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | dataset_dict = DatasetDict({ | 
					
						
						|  | 'train': Dataset.from_list(train_data), | 
					
						
						|  | 'validation': Dataset.from_list(val_data), | 
					
						
						|  | 'test': Dataset.from_list(test_data) | 
					
						
						|  | }) | 
					
						
						|  |  | 
					
						
						|  | print(f"✅ Dataset built: {len(train_data)} train, {len(val_data)} val, {len(test_data)} test") | 
					
						
						|  | return dataset_dict | 
					
						
						|  |  | 
					
						
						|  | def _get_default_texts(self) -> List[str]: | 
					
						
						|  | """Get default text corpus for bit conversion.""" | 
					
						
						|  |  | 
					
						
						|  | texts = [ | 
					
						
						|  | "The quick brown fox jumps over the lazy dog.", | 
					
						
						|  | "In the beginning was the Word, and the Word was with God.", | 
					
						
						|  | "To be or not to be, that is the question.", | 
					
						
						|  | "I think, therefore I am.", | 
					
						
						|  | "The only thing we have to fear is fear itself.", | 
					
						
						|  | "Ask not what your country can do for you.", | 
					
						
						|  | "E = mc²", | 
					
						
						|  | "The mitochondria is the powerhouse of the cell.", | 
					
						
						|  | "SELECT * FROM users WHERE active = 1;", | 
					
						
						|  | "def fibonacci(n): return n if n < 2 else fibonacci(n-1) + fibonacci(n-2)", | 
					
						
						|  | "Binary trees are hierarchical data structures.", | 
					
						
						|  | "The entropy of a system tends to increase over time.", | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | expanded_texts = texts.copy() | 
					
						
						|  | for i in range(500): | 
					
						
						|  |  | 
					
						
						|  | combined = " ".join(random.sample(texts, random.randint(2, 4))) | 
					
						
						|  | expanded_texts.append(combined) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if i % 50 == 0: | 
					
						
						|  | expanded_texts.append(f"Sample {i}: " + random.choice(texts)) | 
					
						
						|  |  | 
					
						
						|  | return expanded_texts | 
					
						
						|  |  | 
					
						
						|  | def upload_to_huggingface(self, dataset: DatasetDict, | 
					
						
						|  | private: bool = True) -> str: | 
					
						
						|  | """Upload dataset to HuggingFace Hub.""" | 
					
						
						|  | print(f"🌐 Uploading to HuggingFace: {self.repo_id}") | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | create_repo( | 
					
						
						|  | repo_id=self.repo_id, | 
					
						
						|  | repo_type="dataset", | 
					
						
						|  | private=private, | 
					
						
						|  | exist_ok=True, | 
					
						
						|  | token=self.hf_token | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | dataset_info = { | 
					
						
						|  | "dataset_info": self.config, | 
					
						
						|  | "splits": { | 
					
						
						|  | "train": len(dataset["train"]), | 
					
						
						|  | "validation": len(dataset["validation"]), | 
					
						
						|  | "test": len(dataset["test"]) | 
					
						
						|  | }, | 
					
						
						|  | "features": { | 
					
						
						|  | "id": "string", | 
					
						
						|  | "bit_sequence": "list of integers (0/1)", | 
					
						
						|  | "sequence_length": "integer", | 
					
						
						|  | "negentropy": "float", | 
					
						
						|  | "lz_complexity": "float", | 
					
						
						|  | "category": "string", | 
					
						
						|  | "has_parity": "boolean" | 
					
						
						|  | }, | 
					
						
						|  | "usage_notes": [ | 
					
						
						|  | "Optimized for BitTransformerLM bit-native training", | 
					
						
						|  | "All sequences include parity protection", | 
					
						
						|  | "Safety metrics (K/C/S) computed for each sample", | 
					
						
						|  | "Supports progressive curriculum learning" | 
					
						
						|  | ] | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | dataset.push_to_hub( | 
					
						
						|  | repo_id=self.repo_id, | 
					
						
						|  | token=self.hf_token, | 
					
						
						|  | private=private | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: | 
					
						
						|  | json.dump(dataset_info, f, indent=2) | 
					
						
						|  | self.api.upload_file( | 
					
						
						|  | path_or_fileobj=f.name, | 
					
						
						|  | path_in_repo="dataset_info.json", | 
					
						
						|  | repo_id=self.repo_id, | 
					
						
						|  | repo_type="dataset", | 
					
						
						|  | token=self.hf_token | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | print(f"✅ Dataset uploaded successfully to: https://huggingface.co/datasets/{self.repo_id}") | 
					
						
						|  | return f"https://huggingface.co/datasets/{self.repo_id}" | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print(f"❌ Upload failed: {e}") | 
					
						
						|  | raise | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def create_bittransformerlm_dataset(hf_token: str, | 
					
						
						|  | repo_id: str = "BitTransformerLM", | 
					
						
						|  | source_texts: Optional[List[str]] = None) -> str: | 
					
						
						|  | """ | 
					
						
						|  | Convenience function to create and upload BitTransformerLM dataset. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | hf_token: HuggingFace access token | 
					
						
						|  | repo_id: Dataset repository ID | 
					
						
						|  | source_texts: Optional list of source texts for conversion | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | URL to the uploaded dataset | 
					
						
						|  | """ | 
					
						
						|  | builder = BitTransformerDatasetBuilder(hf_token, repo_id) | 
					
						
						|  | dataset = builder.build_complete_dataset(source_texts) | 
					
						
						|  | return builder.upload_to_huggingface(dataset, private=True) |