Youtube_Video / app.py
pranit144's picture
Upload 8 files
d48c8b2 verified
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, session, send_from_directory
import os
import re
import json
import tempfile
import time
import threading
import yt_dlp
import spacy
import google.generativeai as genai
from werkzeug.utils import secure_filename
app = Flask(__name__)
app.secret_key = os.urandom(24) # Required for flash and session
# Configuration
UPLOAD_FOLDER = os.path.join(os.getcwd(), 'uploads')
RESULTS_FOLDER = os.path.join(os.getcwd(), 'results')
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['RESULTS_FOLDER'] = RESULTS_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
# Create required directories if they don't exist
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(RESULTS_FOLDER, exist_ok=True)
# Default API key (can be overridden in the UI)
DEFAULT_API_KEY = "AIzaSyB0IOx76FydAk4wabMz1juzzHF5oBiHW64"
# Global variable to track processing status
processing_status = {
'is_processing': False,
'current_step': '',
'progress': 0,
'log': []
}
# Initialize spaCy NLP pipeline
try:
nlp = spacy.load('en_core_web_sm')
except OSError:
import subprocess
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"])
nlp = spacy.load('en_core_web_sm')
# Configuration for yt_dlp
YDL_OPTS = {
'skip_download': True,
'writesubtitles': True,
'writeautomaticsub': True,
'subtitleslangs': ['en'],
'outtmpl': '%(id)s.%(ext)s',
}
def update_status(step, progress, message):
"""Update the processing status"""
processing_status['current_step'] = step
processing_status['progress'] = progress
processing_status['log'].append({'time': time.strftime('%H:%M:%S'), 'message': message})
print(f"Status: {step} - {progress}% - {message}")
def download_subtitles(video_url):
"""
Downloads (auto-)subtitles for the given YouTube URL.
Returns the filename of the downloaded subtitle file (.srt or .vtt) and video title.
"""
update_status('download_subtitles', 10, f"Downloading subtitles for {video_url}...")
with yt_dlp.YoutubeDL(YDL_OPTS) as ydl:
info = ydl.extract_info(video_url, download=True)
video_id = info.get('id')
video_title = info.get('title', 'Unknown Title')
update_status('download_subtitles', 20, f"Video title: {video_title}")
# Check for standard filename patterns
for ext in ('.en.vtt', '.en.srt', '.vtt', '.srt'):
potential_names = [
f"{video_id}{ext}",
f"{video_id}.en{ext}",
]
for fname in potential_names:
if os.path.exists(fname):
update_status('download_subtitles', 30, f"Found subtitle file: {fname}")
return fname, video_title
# Fallback: find any subtitle file for this video_id
for fname in os.listdir('.'):
if fname.startswith(video_id) and fname.lower().endswith(('.srt', '.vtt')):
update_status('download_subtitles', 30, f"Found subtitle file: {fname}")
return fname, video_title
raise FileNotFoundError(f"Subtitle file for {video_id} not found.")
def extract_dialogue_from_srt(path):
"""
Reads a subtitle file (.srt or .vtt), removes timestamps and metadata,
and returns cleaned dialogue as a single string.
"""
update_status('extract_dialogue', 40, f"Extracting dialogue from {path}...")
pattern_timestamp = re.compile(r"^\d{2}:\d{2}:\d{2}[\.,]\d+ -->")
cleaned_lines = []
with open(path, 'r', encoding='utf-8', errors='replace') as f:
for line in f:
line = line.strip()
# Skip empty, index, timestamp, or styling lines
if not line or re.match(r"^\d+$", line) or pattern_timestamp.match(line) or line.startswith(
('WEBVTT', 'Kind:', 'Language:')):
continue
# Remove inline tags
text = re.sub(r"<[^>]+>", "", line)
cleaned_lines.append(text)
# Join lines with smart handling of sentence boundaries
dialogue = " ".join(cleaned_lines)
# Clean up multiple spaces
dialogue = re.sub(r'\s+', ' ', dialogue)
return dialogue
def process_text_with_spacy(text):
"""
Runs spaCy NLP pipeline to perform sentence segmentation,
highlight named entities, and returns a formatted string.
"""
update_status('process_text_with_spacy', 50, "Processing text with spaCy...")
doc = nlp(text)
formatted = []
for sent in doc.sents:
sent_text = sent.text.strip()
# Skip empty sentences or sentences with just punctuation
if len(sent_text) <= 1:
continue
entities = {}
for ent in sent.ents:
entities[ent.text] = ent.label_
if entities:
for entity, label in entities.items():
sent_text = sent_text.replace(entity, f"**{entity} ({label})**")
formatted.append(sent_text)
return "\n\n".join(formatted)
def process_with_gemini(api_key, text, video_title):
"""
Sends the processed transcript to Gemini API for final formatting and analysis.
"""
update_status('process_with_gemini', 60, "Sending to Gemini for final processing...")
# Configure the Gemini API
genai.configure(api_key=api_key)
model = genai.GenerativeModel('gemini-2.0-flash')
prompt = f"""
I'm providing a transcript from the YouTube video titled: "{video_title}"
Please analyze this transcript and return a JSON object with the following fields:
1. "summary": An array of bullet points summarizing key points (5-7 items)
2. "topics": An array of main topics discussed (3-5 items)
3. "formatted_transcript": A well-formatted version of the transcript
4. "notable_quotes": An array of 3-5 notable quotes from the transcript
Here's the raw transcript:
{text}
Return your analysis as a valid JSON object containing all requested fields.
"""
response = model.generate_content(prompt)
try:
# Try to parse the response as JSON
response_text = response.text
# Extract JSON from the response if it's wrapped in markdown code blocks
if "```json" in response_text:
json_content = response_text.split("```json")[1].split("```")[0].strip()
elif "```" in response_text:
json_content = response_text.split("```")[1].strip()
else:
json_content = response_text
result = json.loads(json_content)
update_status('process_with_gemini', 70, "Gemini processing complete")
return result
except json.JSONDecodeError:
# If JSON parsing fails, return a structured response with the raw text
update_status('process_with_gemini', 70, "Warning: Could not parse Gemini response as JSON")
return {
"summary": ["Unable to parse Gemini response as JSON"],
"topics": ["Error in processing"],
"formatted_transcript": response.text,
"notable_quotes": []
}
def translate_to_hindi(api_key, results):
"""
Translates the processed results to Hindi using Gemini AI.
"""
update_status('translate_to_hindi', 80, "Translating results to Hindi using Gemini...")
# Configure the Gemini API
genai.configure(api_key=api_key)
model = genai.GenerativeModel('gemini-2.0-flash') # Using flash model for faster response
# Create a copy of the results for Hindi translation
hindi_results = {
"summary": [],
"topics": [],
"formatted_transcript": "",
"notable_quotes": []
}
# Translate summary points
summary_prompt = f"""
Translate the following English bullet points to Hindi.
Keep formatting and meaning intact:
{json.dumps(results["summary"], indent=2)}
Return the result as a JSON array.
"""
summary_response = model.generate_content(summary_prompt)
try:
# Extract JSON from the response
summary_text = summary_response.text
if "```json" in summary_text:
json_content = summary_text.split("```json")[1].split("```")[0].strip()
elif "```" in summary_text:
json_content = summary_text.split("```")[1].strip()
else:
json_content = summary_text
hindi_results["summary"] = json.loads(json_content)
update_status('translate_to_hindi', 82, "Summary translation complete.")
except Exception as e:
update_status('translate_to_hindi', 82, f"Error in summary translation: {e}")
# Fallback: process items individually
for point in results["summary"]:
prompt = f"Translate this to Hindi: {point}"
response = model.generate_content(prompt)
hindi_results["summary"].append(response.text.strip())
# Translate topics
topics_prompt = f"""
Translate the following English topics to Hindi.
Keep formatting and meaning intact:
{json.dumps(results["topics"], indent=2)}
Return the result as a JSON array.
"""
topics_response = model.generate_content(topics_prompt)
try:
# Extract JSON from the response
topics_text = topics_response.text
if "```json" in topics_text:
json_content = topics_text.split("```json")[1].split("```")[0].strip()
elif "```" in topics_text:
json_content = topics_text.split("```")[1].strip()
else:
json_content = topics_text
hindi_results["topics"] = json.loads(json_content)
update_status('translate_to_hindi', 85, "Topics translation complete.")
except Exception as e:
update_status('translate_to_hindi', 85, f"Error in topics translation: {e}")
# Fallback
for topic in results["topics"]:
prompt = f"Translate this to Hindi: {topic}"
response = model.generate_content(prompt)
hindi_results["topics"].append(response.text.strip())
# Translate notable quotes
quotes_prompt = f"""
Translate the following English quotes to Hindi.
Keep formatting and meaning intact:
{json.dumps(results["notable_quotes"], indent=2)}
Return ONLY the translated Hindi text in JSON array format.
"""
quotes_response = model.generate_content(quotes_prompt)
try:
# Extract JSON from the response
quotes_text = quotes_response.text
if "```json" in quotes_text:
json_content = quotes_text.split("```json")[1].split("```")[0].strip()
elif "```" in quotes_text:
json_content = quotes_text.split("```")[1].strip()
else:
json_content = quotes_text
hindi_results["notable_quotes"] = json.loads(json_content)
update_status('translate_to_hindi', 88, "Quotes translation complete.")
except Exception as e:
update_status('translate_to_hindi', 88, f"Error in quotes translation: {e}")
# Fallback
for quote in results["notable_quotes"]:
prompt = f"Translate this to Hindi: {quote}"
response = model.generate_content(prompt)
hindi_results["notable_quotes"].append(response.text.strip())
# Translate the formatted transcript (may need to be chunked for long texts)
transcript = results["formatted_transcript"]
# Split transcript into paragraphs
paragraphs = transcript.split("\n\n")
translated_paragraphs = []
# Process paragraphs in batches
batch_size = 5 # Adjust based on average paragraph length
total_paragraphs = len(paragraphs)
for i in range(0, total_paragraphs, batch_size):
batch = paragraphs[i:i + batch_size]
batch_text = "\n\n".join(batch)
progress = 88 + (i / total_paragraphs * 10) # Scale from 88% to 98%
update_status('translate_to_hindi', int(progress),
f"Translating transcript paragraphs {i + 1} to {min(i + batch_size, total_paragraphs)} of {total_paragraphs}")
translate_prompt = f"""
Translate the following English text to Hindi.
Preserve paragraph breaks and formatting:
{batch_text}
Return ONLY the translated Hindi text.
"""
try:
response = model.generate_content(translate_prompt)
translated_batch = response.text.strip()
translated_paragraphs.append(translated_batch)
except Exception as e:
update_status('translate_to_hindi', int(progress), f"Error in batch translation: {e}")
# Fallback: translate paragraph by paragraph
for para in batch:
try:
prompt = f"Translate this to Hindi: {para}"
response = model.generate_content(prompt)
translated_paragraphs.append(response.text.strip())
except:
# In case of failure, add original paragraph
translated_paragraphs.append(f"[Translation error: {para[:50]}...]")
# Join all translated content
hindi_results["formatted_transcript"] = "\n\n".join(translated_paragraphs)
update_status('translate_to_hindi', 98, "Transcript translation complete.")
return hindi_results
def save_results(results, output_file):
"""
Saves the processed results to a file.
"""
with open(output_file, 'w', encoding='utf-8') as f:
# First write a markdown-formatted version
f.write(f"# Transcript Analysis\n\n")
f.write("## Summary\n")
for point in results["summary"]:
f.write(f"- {point}\n")
f.write("\n")
f.write("## Topics\n")
for topic in results["topics"]:
f.write(f"- {topic}\n")
f.write("\n")
f.write("## Notable Quotes\n")
for quote in results["notable_quotes"]:
f.write(f"> {quote}\n\n")
f.write("\n")
f.write("## Formatted Transcript\n\n")
f.write(results["formatted_transcript"])
f.write("\n\n")
# Also save the raw JSON
f.write("---\n\n")
f.write("```json\n")
json.dump(results, f, indent=2)
f.write("\n```\n")
update_status('save_results', 99, f"Results saved to {output_file}")
def save_hindi_results(hindi_results, output_file):
"""
Saves the Hindi translated results to a file.
"""
with open(output_file, 'w', encoding='utf-8') as f:
# First write a markdown-formatted version
f.write(f"# प्रतिलेख विश्लेषण\n\n")
f.write("## सारांश\n")
for point in hindi_results["summary"]:
f.write(f"- {point}\n")
f.write("\n")
f.write("## विषय\n")
for topic in hindi_results["topics"]:
f.write(f"- {topic}\n")
f.write("\n")
f.write("## उल्लेखनीय उद्धरण\n")
for quote in hindi_results["notable_quotes"]:
f.write(f"> {quote}\n\n")
f.write("\n")
f.write("## स्वरूपित प्रतिलेख\n\n")
f.write(hindi_results["formatted_transcript"])
f.write("\n\n")
# Also save the raw JSON
f.write("---\n\n")
f.write("```json\n")
json.dump(hindi_results, f, indent=2, ensure_ascii=False)
f.write("\n```\n")
update_status('save_hindi_results', 100, f"Hindi results saved to {output_file}")
def process_youtube_url(youtube_url, api_key):
"""Process a YouTube URL and return the analysis results"""
global processing_status
try:
processing_status = {
'is_processing': True,
'current_step': 'Starting',
'progress': 0,
'log': []
}
# Generate unique filenames for this run
timestamp = int(time.time())
eng_output_file = os.path.join(app.config['RESULTS_FOLDER'], f"transcript_analysis_{timestamp}.md")
hindi_output_file = os.path.join(app.config['RESULTS_FOLDER'], f"transcript_analysis_hindi_{timestamp}.md")
# Step 1: Download subtitles
subtitle_path, video_title = download_subtitles(youtube_url)
# Step 2: Extract and clean dialogue
raw_dialogue = extract_dialogue_from_srt(subtitle_path)
# Step 3: Process with spaCy
nlp_processed = process_text_with_spacy(raw_dialogue)
# Step 4: Process with Gemini
final_results = process_with_gemini(api_key, raw_dialogue, video_title)
# Step 5: Save English results
save_results(final_results, eng_output_file)
# Step 6: Translate to Hindi
hindi_results = translate_to_hindi(api_key, final_results)
# Step 7: Save Hindi results
save_hindi_results(hindi_results, hindi_output_file)
# Clean up subtitle file
if os.path.exists(subtitle_path):
os.remove(subtitle_path)
update_status('cleanup', 100, f"Cleaned up temporary file: {subtitle_path}")
processing_status['is_processing'] = False
return {
'success': True,
'video_title': video_title,
'english_file': os.path.basename(eng_output_file),
'hindi_file': os.path.basename(hindi_output_file),
'english_results': final_results,
'hindi_results': hindi_results
}
except Exception as e:
processing_status['is_processing'] = False
processing_status['log'].append({'time': time.strftime('%H:%M:%S'), 'message': f"Error: {str(e)}"})
return {
'success': False,
'error': str(e)
}
@app.route('/')
def index():
"""Home page with form for entering YouTube URL"""
api_key = session.get('api_key', DEFAULT_API_KEY)
return render_template('index.html', api_key=api_key)
@app.route('/process', methods=['POST'])
def process():
"""Start processing a YouTube URL"""
if processing_status['is_processing']:
return jsonify({'success': False, 'error': 'Another process is already running'})
youtube_url = request.form.get('youtube_url', '').strip()
api_key = request.form.get('api_key', DEFAULT_API_KEY).strip()
if not youtube_url:
return jsonify({'success': False, 'error': 'Please enter a valid YouTube URL'})
# Start processing in a background thread
thread = threading.Thread(
target=process_youtube_url,
args=(youtube_url, api_key)
)
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': 'Processing started'})
@app.route('/status')
def status():
"""Return the current processing status"""
return jsonify(processing_status)
@app.route('/results/<filename>')
def results(filename):
"""Serve result files"""
return send_from_directory(app.config['RESULTS_FOLDER'], filename)
@app.route('/list_results')
def list_results():
"""List all available result files"""
files = []
for filename in os.listdir(app.config['RESULTS_FOLDER']):
if filename.endswith('.md'):
filepath = os.path.join(app.config['RESULTS_FOLDER'], filename)
files.append({
'filename': filename,
'size': os.path.getsize(filepath),
'created': os.path.getctime(filepath),
'is_hindi': 'hindi' in filename.lower()
})
# Sort by creation time (newest first)
files.sort(key=lambda x: x['created'], reverse=True)
return jsonify(files)
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)