import os import tempfile import whisper import subprocess from transformers import pipeline from concurrent.futures import ThreadPoolExecutor import re import json from hashlib import md5 import browser_cookie3 class VideoProcessor: def __init__(self): self.summarizer = pipeline("summarization", model="facebook/bart-large-cnn") self.models = {} self.cookie_file = "cookies.txt" # Path to your cookies file def load_model(self, model_size="base"): if model_size not in self.models: self.models[model_size] = whisper.load_model(model_size) return self.models[model_size] def _download_with_cookies(self, url): """Method 1: Download using browser cookies""" cmd = [ "yt-dlp", "--cookies", self.cookie_file, "--extract-audio", "--audio-format", "mp3", "--audio-quality", "0", "--quiet", "-o", os.path.join(tempfile.mkdtemp(), "audio.%(ext)s"), url ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise Exception(f"Cookie download failed: {result.stderr}") return self._find_downloaded_file() def _download_with_yt_dlp(self, url): """Method 2: Regular download""" cmd = [ "yt-dlp", "--extract-audio", "--audio-format", "mp3", "--quiet", "-o", os.path.join(tempfile.mkdtemp(), "audio.%(ext)s"), url ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise Exception(f"Download failed: {result.stderr}") return self._find_downloaded_file() def _find_downloaded_file(self): """Helper to find downloaded audio file""" for root, _, files in os.walk(tempfile.gettempdir()): for file in files: if file.endswith('.mp3'): return os.path.join(root, file) raise Exception("Downloaded audio file not found") def download_audio(self, url, use_cookies=False): """Robust download with fallback methods""" try: if use_cookies and os.path.exists(self.cookie_file): return self._download_with_cookies(url) return self._download_with_yt_dlp(url) except Exception as e: raise Exception(f"All download methods failed: {str(e)}") def transcribe_audio(self, audio_path, model_size="base"): model = self.load_model(model_size) result = model.transcribe(audio_path) return result["text"] def clean_transcript(self, text): text = re.sub(r'\b(um|uh|like|you know)\b', '', text, flags=re.IGNORECASE) return re.sub(r'\s+', ' ', text).strip() def summarize_chunk(self, chunk): return self.summarizer(chunk, max_length=150, min_length=30)[0]['summary_text'] def summarize_text(self, text, chunk_size=1000): text = self.clean_transcript(text) chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)] with ThreadPoolExecutor(max_workers=4) as executor: summaries = list(executor.map(self.summarize_chunk, chunks)) return "\n".join(summaries) def extract_key_points(self, text): prompt = f"""Extract 5-7 key points from this transcript. Each point should: - Start with a bullet (-) - Be concise but specific - Include numbers/dates when mentioned Transcript: {text[:8000]} Key Points:""" result = self.summarizer(prompt, max_length=300, min_length=100)[0]['summary_text'] return re.sub(r'(^|\n)(?=\w)', '\n- ', result) def get_video_id(self, url): return md5(url.encode()).hexdigest() def process(self, youtube_url, chunk_size=1000, model_size="base", use_cookies=False): video_id = self.get_video_id(youtube_url) cache_file = f"cache_{video_id}.json" if os.path.exists(cache_file): with open(cache_file) as f: return json.load(f) try: audio_path = self.download_audio(youtube_url, use_cookies) transcript = self.transcribe_audio(audio_path, model_size) result = { 'summary': self.summarize_text(transcript, chunk_size), 'key_points': self.extract_key_points(transcript), 'transcript': transcript[:2000] + ("..." if len(transcript) > 2000 else "") } with open(cache_file, 'w') as f: json.dump(result, f) return result except Exception as e: return {'error': str(e)}