from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, session, send_from_directory import os import re import json import tempfile import time import threading import yt_dlp import spacy import google.generativeai as genai from werkzeug.utils import secure_filename app = Flask(__name__) app.secret_key = os.urandom(24) # Required for flash and session # Configuration UPLOAD_FOLDER = os.path.join(os.getcwd(), 'uploads') RESULTS_FOLDER = os.path.join(os.getcwd(), 'results') app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['RESULTS_FOLDER'] = RESULTS_FOLDER app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size # Create required directories if they don't exist os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(RESULTS_FOLDER, exist_ok=True) # Default API key (can be overridden in the UI) DEFAULT_API_KEY = "AIzaSyB0IOx76FydAk4wabMz1juzzHF5oBiHW64" # Global variable to track processing status processing_status = { 'is_processing': False, 'current_step': '', 'progress': 0, 'log': [] } # Initialize spaCy NLP pipeline try: nlp = spacy.load('en_core_web_sm') except OSError: import subprocess subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"]) nlp = spacy.load('en_core_web_sm') # Configuration for yt_dlp YDL_OPTS = { 'skip_download': True, 'writesubtitles': True, 'writeautomaticsub': True, 'subtitleslangs': ['en'], 'outtmpl': '%(id)s.%(ext)s', } def update_status(step, progress, message): """Update the processing status""" processing_status['current_step'] = step processing_status['progress'] = progress processing_status['log'].append({'time': time.strftime('%H:%M:%S'), 'message': message}) print(f"Status: {step} - {progress}% - {message}") def download_subtitles(video_url): """ Downloads (auto-)subtitles for the given YouTube URL. Returns the filename of the downloaded subtitle file (.srt or .vtt) and video title. """ update_status('download_subtitles', 10, f"Downloading subtitles for {video_url}...") with yt_dlp.YoutubeDL(YDL_OPTS) as ydl: info = ydl.extract_info(video_url, download=True) video_id = info.get('id') video_title = info.get('title', 'Unknown Title') update_status('download_subtitles', 20, f"Video title: {video_title}") # Check for standard filename patterns for ext in ('.en.vtt', '.en.srt', '.vtt', '.srt'): potential_names = [ f"{video_id}{ext}", f"{video_id}.en{ext}", ] for fname in potential_names: if os.path.exists(fname): update_status('download_subtitles', 30, f"Found subtitle file: {fname}") return fname, video_title # Fallback: find any subtitle file for this video_id for fname in os.listdir('.'): if fname.startswith(video_id) and fname.lower().endswith(('.srt', '.vtt')): update_status('download_subtitles', 30, f"Found subtitle file: {fname}") return fname, video_title raise FileNotFoundError(f"Subtitle file for {video_id} not found.") def extract_dialogue_from_srt(path): """ Reads a subtitle file (.srt or .vtt), removes timestamps and metadata, and returns cleaned dialogue as a single string. """ update_status('extract_dialogue', 40, f"Extracting dialogue from {path}...") pattern_timestamp = re.compile(r"^\d{2}:\d{2}:\d{2}[\.,]\d+ -->") cleaned_lines = [] with open(path, 'r', encoding='utf-8', errors='replace') as f: for line in f: line = line.strip() # Skip empty, index, timestamp, or styling lines if not line or re.match(r"^\d+$", line) or pattern_timestamp.match(line) or line.startswith( ('WEBVTT', 'Kind:', 'Language:')): continue # Remove inline tags text = re.sub(r"<[^>]+>", "", line) cleaned_lines.append(text) # Join lines with smart handling of sentence boundaries dialogue = " ".join(cleaned_lines) # Clean up multiple spaces dialogue = re.sub(r'\s+', ' ', dialogue) return dialogue def process_text_with_spacy(text): """ Runs spaCy NLP pipeline to perform sentence segmentation, highlight named entities, and returns a formatted string. """ update_status('process_text_with_spacy', 50, "Processing text with spaCy...") doc = nlp(text) formatted = [] for sent in doc.sents: sent_text = sent.text.strip() # Skip empty sentences or sentences with just punctuation if len(sent_text) <= 1: continue entities = {} for ent in sent.ents: entities[ent.text] = ent.label_ if entities: for entity, label in entities.items(): sent_text = sent_text.replace(entity, f"**{entity} ({label})**") formatted.append(sent_text) return "\n\n".join(formatted) def process_with_gemini(api_key, text, video_title): """ Sends the processed transcript to Gemini API for final formatting and analysis. """ update_status('process_with_gemini', 60, "Sending to Gemini for final processing...") # Configure the Gemini API genai.configure(api_key=api_key) model = genai.GenerativeModel('gemini-2.0-flash') prompt = f""" I'm providing a transcript from the YouTube video titled: "{video_title}" Please analyze this transcript and return a JSON object with the following fields: 1. "summary": An array of bullet points summarizing key points (5-7 items) 2. "topics": An array of main topics discussed (3-5 items) 3. "formatted_transcript": A well-formatted version of the transcript 4. "notable_quotes": An array of 3-5 notable quotes from the transcript Here's the raw transcript: {text} Return your analysis as a valid JSON object containing all requested fields. """ response = model.generate_content(prompt) try: # Try to parse the response as JSON response_text = response.text # Extract JSON from the response if it's wrapped in markdown code blocks if "```json" in response_text: json_content = response_text.split("```json")[1].split("```")[0].strip() elif "```" in response_text: json_content = response_text.split("```")[1].strip() else: json_content = response_text result = json.loads(json_content) update_status('process_with_gemini', 70, "Gemini processing complete") return result except json.JSONDecodeError: # If JSON parsing fails, return a structured response with the raw text update_status('process_with_gemini', 70, "Warning: Could not parse Gemini response as JSON") return { "summary": ["Unable to parse Gemini response as JSON"], "topics": ["Error in processing"], "formatted_transcript": response.text, "notable_quotes": [] } def translate_to_hindi(api_key, results): """ Translates the processed results to Hindi using Gemini AI. """ update_status('translate_to_hindi', 80, "Translating results to Hindi using Gemini...") # Configure the Gemini API genai.configure(api_key=api_key) model = genai.GenerativeModel('gemini-2.0-flash') # Using flash model for faster response # Create a copy of the results for Hindi translation hindi_results = { "summary": [], "topics": [], "formatted_transcript": "", "notable_quotes": [] } # Translate summary points summary_prompt = f""" Translate the following English bullet points to Hindi. Keep formatting and meaning intact: {json.dumps(results["summary"], indent=2)} Return the result as a JSON array. """ summary_response = model.generate_content(summary_prompt) try: # Extract JSON from the response summary_text = summary_response.text if "```json" in summary_text: json_content = summary_text.split("```json")[1].split("```")[0].strip() elif "```" in summary_text: json_content = summary_text.split("```")[1].strip() else: json_content = summary_text hindi_results["summary"] = json.loads(json_content) update_status('translate_to_hindi', 82, "Summary translation complete.") except Exception as e: update_status('translate_to_hindi', 82, f"Error in summary translation: {e}") # Fallback: process items individually for point in results["summary"]: prompt = f"Translate this to Hindi: {point}" response = model.generate_content(prompt) hindi_results["summary"].append(response.text.strip()) # Translate topics topics_prompt = f""" Translate the following English topics to Hindi. Keep formatting and meaning intact: {json.dumps(results["topics"], indent=2)} Return the result as a JSON array. """ topics_response = model.generate_content(topics_prompt) try: # Extract JSON from the response topics_text = topics_response.text if "```json" in topics_text: json_content = topics_text.split("```json")[1].split("```")[0].strip() elif "```" in topics_text: json_content = topics_text.split("```")[1].strip() else: json_content = topics_text hindi_results["topics"] = json.loads(json_content) update_status('translate_to_hindi', 85, "Topics translation complete.") except Exception as e: update_status('translate_to_hindi', 85, f"Error in topics translation: {e}") # Fallback for topic in results["topics"]: prompt = f"Translate this to Hindi: {topic}" response = model.generate_content(prompt) hindi_results["topics"].append(response.text.strip()) # Translate notable quotes quotes_prompt = f""" Translate the following English quotes to Hindi. Keep formatting and meaning intact: {json.dumps(results["notable_quotes"], indent=2)} Return ONLY the translated Hindi text in JSON array format. """ quotes_response = model.generate_content(quotes_prompt) try: # Extract JSON from the response quotes_text = quotes_response.text if "```json" in quotes_text: json_content = quotes_text.split("```json")[1].split("```")[0].strip() elif "```" in quotes_text: json_content = quotes_text.split("```")[1].strip() else: json_content = quotes_text hindi_results["notable_quotes"] = json.loads(json_content) update_status('translate_to_hindi', 88, "Quotes translation complete.") except Exception as e: update_status('translate_to_hindi', 88, f"Error in quotes translation: {e}") # Fallback for quote in results["notable_quotes"]: prompt = f"Translate this to Hindi: {quote}" response = model.generate_content(prompt) hindi_results["notable_quotes"].append(response.text.strip()) # Translate the formatted transcript (may need to be chunked for long texts) transcript = results["formatted_transcript"] # Split transcript into paragraphs paragraphs = transcript.split("\n\n") translated_paragraphs = [] # Process paragraphs in batches batch_size = 5 # Adjust based on average paragraph length total_paragraphs = len(paragraphs) for i in range(0, total_paragraphs, batch_size): batch = paragraphs[i:i + batch_size] batch_text = "\n\n".join(batch) progress = 88 + (i / total_paragraphs * 10) # Scale from 88% to 98% update_status('translate_to_hindi', int(progress), f"Translating transcript paragraphs {i + 1} to {min(i + batch_size, total_paragraphs)} of {total_paragraphs}") translate_prompt = f""" Translate the following English text to Hindi. Preserve paragraph breaks and formatting: {batch_text} Return ONLY the translated Hindi text. """ try: response = model.generate_content(translate_prompt) translated_batch = response.text.strip() translated_paragraphs.append(translated_batch) except Exception as e: update_status('translate_to_hindi', int(progress), f"Error in batch translation: {e}") # Fallback: translate paragraph by paragraph for para in batch: try: prompt = f"Translate this to Hindi: {para}" response = model.generate_content(prompt) translated_paragraphs.append(response.text.strip()) except: # In case of failure, add original paragraph translated_paragraphs.append(f"[Translation error: {para[:50]}...]") # Join all translated content hindi_results["formatted_transcript"] = "\n\n".join(translated_paragraphs) update_status('translate_to_hindi', 98, "Transcript translation complete.") return hindi_results def save_results(results, output_file): """ Saves the processed results to a file. """ with open(output_file, 'w', encoding='utf-8') as f: # First write a markdown-formatted version f.write(f"# Transcript Analysis\n\n") f.write("## Summary\n") for point in results["summary"]: f.write(f"- {point}\n") f.write("\n") f.write("## Topics\n") for topic in results["topics"]: f.write(f"- {topic}\n") f.write("\n") f.write("## Notable Quotes\n") for quote in results["notable_quotes"]: f.write(f"> {quote}\n\n") f.write("\n") f.write("## Formatted Transcript\n\n") f.write(results["formatted_transcript"]) f.write("\n\n") # Also save the raw JSON f.write("---\n\n") f.write("```json\n") json.dump(results, f, indent=2) f.write("\n```\n") update_status('save_results', 99, f"Results saved to {output_file}") def save_hindi_results(hindi_results, output_file): """ Saves the Hindi translated results to a file. """ with open(output_file, 'w', encoding='utf-8') as f: # First write a markdown-formatted version f.write(f"# प्रतिलेख विश्लेषण\n\n") f.write("## सारांश\n") for point in hindi_results["summary"]: f.write(f"- {point}\n") f.write("\n") f.write("## विषय\n") for topic in hindi_results["topics"]: f.write(f"- {topic}\n") f.write("\n") f.write("## उल्लेखनीय उद्धरण\n") for quote in hindi_results["notable_quotes"]: f.write(f"> {quote}\n\n") f.write("\n") f.write("## स्वरूपित प्रतिलेख\n\n") f.write(hindi_results["formatted_transcript"]) f.write("\n\n") # Also save the raw JSON f.write("---\n\n") f.write("```json\n") json.dump(hindi_results, f, indent=2, ensure_ascii=False) f.write("\n```\n") update_status('save_hindi_results', 100, f"Hindi results saved to {output_file}") def process_youtube_url(youtube_url, api_key): """Process a YouTube URL and return the analysis results""" global processing_status try: processing_status = { 'is_processing': True, 'current_step': 'Starting', 'progress': 0, 'log': [] } # Generate unique filenames for this run timestamp = int(time.time()) eng_output_file = os.path.join(app.config['RESULTS_FOLDER'], f"transcript_analysis_{timestamp}.md") hindi_output_file = os.path.join(app.config['RESULTS_FOLDER'], f"transcript_analysis_hindi_{timestamp}.md") # Step 1: Download subtitles subtitle_path, video_title = download_subtitles(youtube_url) # Step 2: Extract and clean dialogue raw_dialogue = extract_dialogue_from_srt(subtitle_path) # Step 3: Process with spaCy nlp_processed = process_text_with_spacy(raw_dialogue) # Step 4: Process with Gemini final_results = process_with_gemini(api_key, raw_dialogue, video_title) # Step 5: Save English results save_results(final_results, eng_output_file) # Step 6: Translate to Hindi hindi_results = translate_to_hindi(api_key, final_results) # Step 7: Save Hindi results save_hindi_results(hindi_results, hindi_output_file) # Clean up subtitle file if os.path.exists(subtitle_path): os.remove(subtitle_path) update_status('cleanup', 100, f"Cleaned up temporary file: {subtitle_path}") processing_status['is_processing'] = False return { 'success': True, 'video_title': video_title, 'english_file': os.path.basename(eng_output_file), 'hindi_file': os.path.basename(hindi_output_file), 'english_results': final_results, 'hindi_results': hindi_results } except Exception as e: processing_status['is_processing'] = False processing_status['log'].append({'time': time.strftime('%H:%M:%S'), 'message': f"Error: {str(e)}"}) return { 'success': False, 'error': str(e) } @app.route('/') def index(): """Home page with form for entering YouTube URL""" api_key = session.get('api_key', DEFAULT_API_KEY) return render_template('index.html', api_key=api_key) @app.route('/process', methods=['POST']) def process(): """Start processing a YouTube URL""" if processing_status['is_processing']: return jsonify({'success': False, 'error': 'Another process is already running'}) youtube_url = request.form.get('youtube_url', '').strip() api_key = request.form.get('api_key', DEFAULT_API_KEY).strip() if not youtube_url: return jsonify({'success': False, 'error': 'Please enter a valid YouTube URL'}) # Start processing in a background thread thread = threading.Thread( target=process_youtube_url, args=(youtube_url, api_key) ) thread.daemon = True thread.start() return jsonify({'success': True, 'message': 'Processing started'}) @app.route('/status') def status(): """Return the current processing status""" return jsonify(processing_status) @app.route('/results/') def results(filename): """Serve result files""" return send_from_directory(app.config['RESULTS_FOLDER'], filename) @app.route('/list_results') def list_results(): """List all available result files""" files = [] for filename in os.listdir(app.config['RESULTS_FOLDER']): if filename.endswith('.md'): filepath = os.path.join(app.config['RESULTS_FOLDER'], filename) files.append({ 'filename': filename, 'size': os.path.getsize(filepath), 'created': os.path.getctime(filepath), 'is_hindi': 'hindi' in filename.lower() }) # Sort by creation time (newest first) files.sort(key=lambda x: x['created'], reverse=True) return jsonify(files) if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5000)