from flask import Flask, render_template, request, jsonify, send_file, flash, redirect, url_for
import os
import subprocess
import json
from werkzeug.utils import secure_filename
import threading
import time
import moviepy.editor as mp
from PIL import Image, ImageDraw, ImageFont
import io
import base64
import random
from functools import lru_cache
import concurrent.futures
import tempfile
import shlex
import re
app = Flask(__name__)
app.secret_key = 'transcricao_audio_key_2024'
# Configurações
UPLOAD_FOLDER = 'uploads'
OUTPUT_FOLDER = 'outputs'
TEMP_FOLDER = 'temp'
ALLOWED_EXTENSIONS = {'mp3', 'wav', 'mp4', 'm4a', 'flac', 'ogg'}
ALLOWED_TRANSCRIPTION = {'json'}
MAX_WORKERS = 4 # Número máximo de workers para processamento paralelo
# Criar pastas se não existirem
for folder in [UPLOAD_FOLDER, OUTPUT_FOLDER, TEMP_FOLDER]:
os.makedirs(folder, exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['OUTPUT_FOLDER'] = OUTPUT_FOLDER
app.config['TEMP_FOLDER'] = TEMP_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 500 * 1024 * 1024 # 500MB max
# Cache para fontes
FONT_CACHE = {}
# Status dos processamentos
processing_status = {}
# Lock para operações críticas
processing_lock = threading.Lock()
status_lock = threading.Lock()
def update_processing_status(job_id, status, message=None, output_file=None):
"""Atualiza status de processamento de forma thread-safe"""
with status_lock:
if job_id not in processing_status:
processing_status[job_id] = {}
if status:
processing_status[job_id]['status'] = status
if message:
processing_status[job_id]['message'] = message
if output_file:
processing_status[job_id]['output_file'] = output_file
def allowed_file(filename, allowed_extensions):
"""Verifica se o arquivo é permitido e seguro"""
if not filename or '.' not in filename:
return False
ext = filename.rsplit('.', 1)[1].lower()
return ext in allowed_extensions
def secure_path(path):
"""Garante que o caminho é seguro"""
return os.path.normpath(path).replace('\\', '/').lstrip('/')
def validate_file(file, allowed_extensions):
"""Valida arquivo enviado"""
if not file or file.filename == '':
return False, 'Nenhum arquivo selecionado'
if not allowed_file(file.filename, allowed_extensions):
return False, f'Formato não suportado. Use: {", ".join(allowed_extensions)}'
# Verificar tamanho
if file.content_length and file.content_length > app.config['MAX_CONTENT_LENGTH']:
return False, 'Arquivo muito grande. Máximo: 500MB'
return True, None
@lru_cache(maxsize=2)
def load_font(size):
"""Carrega e cacheia fontes"""
try:
return ImageFont.truetype("/usr/share/fonts/truetype/open-sans/OpenSans-Regular.ttf", size)
except:
try:
return ImageFont.truetype("/usr/share/fonts/liberation/LiberationSans-Regular.ttf", size)
except:
return ImageFont.load_default()
def create_slide_image(text, width=1920, height=1080):
"""Cria imagem do slide com otimizações"""
# Usar modo L para texto preto e branco (menor tamanho)
img = Image.new('RGB', (width, height), 'white')
draw = ImageDraw.Draw(img)
# Usar fonte cacheada
font = load_font(42)
# Processar texto para identificar palavras em negrito
words = []
current_pos = 0
text_without_tags = ""
while current_pos < len(text):
if text[current_pos:].startswith(''):
end_pos = text.find('', current_pos)
if end_pos != -1:
word = text[current_pos+23:end_pos]
words.append(('red-bold', word))
text_without_tags += word + " "
current_pos = end_pos + 7
continue
elif text[current_pos:].startswith(''):
end_pos = text.find('', current_pos)
if end_pos != -1:
word = text[current_pos+25:end_pos]
words.append(('black-bold', word))
text_without_tags += word + " "
current_pos = end_pos + 7
continue
text_without_tags += text[current_pos]
current_pos += 1
# Quebrar texto em linhas (máximo 2)
words_clean = text_without_tags.split()
lines = []
current_line = []
max_width = width * 0.8
for word in words_clean:
test_line = ' '.join(current_line + [word])
bbox = draw.textbbox((0, 0), test_line, font=font)
line_width = bbox[2] - bbox[0]
if line_width <= max_width:
current_line.append(word)
else:
if len(lines) < 2:
lines.append(' '.join(current_line))
current_line = [word]
else:
break
if current_line and len(lines) < 2:
lines.append(' '.join(current_line))
# Calcular posição vertical
total_height = sum(draw.textbbox((0, 0), line, font=font)[3] - draw.textbbox((0, 0), line, font=font)[1] for line in lines)
total_height += (len(lines) - 1) * 20 # Espaçamento entre linhas
y = (height - total_height) // 2
# Desenhar texto
for line in lines:
bbox = draw.textbbox((0, 0), line, font=font)
x = (width - (bbox[2] - bbox[0])) // 2
draw.text((x, y), line, font=font, fill='black')
y += bbox[3] - bbox[1] + 20
# Otimizar imagem
img_buffer = io.BytesIO()
img.save(img_buffer, format='PNG', optimize=True)
return img_buffer.getvalue()
def process_slides_parallel(slides, temp_dir):
"""Processa slides em paralelo"""
slide_paths = []
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_slide = {
executor.submit(
create_slide_image,
slide['text']
): (i, slide) for i, slide in enumerate(slides)
}
for future in concurrent.futures.as_completed(future_to_slide):
i, slide = future_to_slide[future]
try:
img_data = future.result()
path = os.path.join(temp_dir, f'slide_{i:03d}.png')
with open(path, 'wb') as f:
f.write(img_data)
slide_paths.append((path, slide['start']))
except Exception as e:
print(f"Erro ao processar slide {i}: {e}")
continue
return sorted(slide_paths, key=lambda x: x[1])
@app.route('/export_video', methods=['POST'])
def export_video():
"""Exporta vídeo com otimizações"""
try:
data = request.json
slides = data.get('slides', [])
audio_data = data.get('audio', '')
if not slides:
return jsonify({'error': 'Nenhum slide fornecido'}), 400
# Usar diretório temporário do sistema
with tempfile.TemporaryDirectory() as temp_dir:
try:
# Processar slides em paralelo
slide_paths = process_slides_parallel(slides, temp_dir)
# Criar clips de vídeo
clips = []
video = None
audio = None
try:
for path, start_time in slide_paths:
clip = mp.ImageClip(path, duration=3.0)
clip = clip.set_start(start_time)
clips.append(clip)
# Concatenar clips
video = mp.CompositeVideoClip(clips)
# Processar áudio
if audio_data:
try:
audio_bytes = base64.b64decode(audio_data.split(',')[1])
audio_path = os.path.join(temp_dir, 'audio.mp3')
with open(audio_path, 'wb') as f:
f.write(audio_bytes)
audio = mp.AudioFileClip(audio_path)
video = video.set_duration(audio.duration).set_audio(audio)
except Exception as e:
print(f"Erro ao processar áudio: {e}")
# Continuar sem áudio
# Exportar vídeo otimizado
output_path = os.path.join(temp_dir, 'output.mp4')
video.write_videofile(
output_path,
fps=30,
codec='libx264',
audio_codec='aac',
audio_bitrate='192k',
bitrate='8000k',
preset='medium',
threads=MAX_WORKERS,
logger=None
)
return send_file(
output_path,
as_attachment=True,
download_name='vsl_video.mp4',
mimetype='video/mp4'
)
finally:
# Limpar recursos
if video:
video.close()
if audio:
audio.close()
for clip in clips:
clip.close()
except Exception as e:
print(f"Erro ao processar vídeo: {e}")
raise
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/upload_transcription', methods=['POST'])
def upload_transcription():
"""Upload de arquivo de transcrição"""
if 'file' not in request.files:
return jsonify({'success': False, 'error': 'Nenhum arquivo enviado'})
file = request.files['file']
is_valid, error = validate_file(file, ALLOWED_TRANSCRIPTION)
if not is_valid:
return jsonify({'success': False, 'error': error})
try:
filename = secure_filename(file.filename)
if not filename.endswith('_transcricao.json'):
filename = filename.rsplit('.', 1)[0] + '_transcricao.json'
filepath = os.path.join(app.config['OUTPUT_FOLDER'], filename)
filepath = secure_path(filepath)
# Verificar se o diretório de destino existe
os.makedirs(os.path.dirname(filepath), exist_ok=True)
file.save(filepath)
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/delete_transcription/')
def delete_transcription(filename):
"""Deleta uma transcrição"""
try:
filepath = os.path.join(app.config['OUTPUT_FOLDER'], filename)
if os.path.exists(filepath):
os.remove(filepath)
return jsonify({'success': True})
return jsonify({'success': False, 'error': 'Arquivo não encontrado'})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
def sanitize_filename(filename):
"""Sanitiza nome de arquivo removendo caracteres perigosos"""
# Remover caracteres não seguros
filename = re.sub(r'[^a-zA-Z0-9._-]', '', filename)
# Evitar nomes de arquivo que começam com . ou -
filename = filename.lstrip('.-')
return filename if filename else 'unnamed'
def run_transcription(audio_path, job_id):
"""Executa a transcrição em background com proteção contra injeção"""
try:
print(f"[{job_id}] Iniciando transcrição para {audio_path}")
update_processing_status(job_id, 'processing', 'Iniciando transcrição...')
# Sanitizar caminho do arquivo
audio_path = os.path.abspath(audio_path)
if not os.path.exists(audio_path):
raise ValueError("Arquivo de áudio não encontrado")
# Validar extensão
if not allowed_file(audio_path, ALLOWED_EXTENSIONS):
raise ValueError("Formato de arquivo inválido")
with processing_lock:
# Usar shlex.quote para escapar argumentos
cmd = [
'python',
'transcriptor.py',
shlex.quote(audio_path)
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd='.',
shell=False # Evitar shell injection
)
print(f"[{job_id}] Transcrição finalizada com código {result.returncode}")
if result.stdout:
print(f"[{job_id}] STDOUT: {result.stdout}")
if result.stderr:
print(f"[{job_id}] STDERR: {result.stderr}")
base_name = sanitize_filename(os.path.splitext(os.path.basename(audio_path))[0])
json_file = f"{base_name}_transcricao.json"
if result.returncode == 0:
if os.path.exists(json_file):
output_path = os.path.join(OUTPUT_FOLDER, json_file)
# Usar operação atômica para mover arquivo
os.replace(json_file, output_path)
update_processing_status(
job_id,
'completed',
'Transcrição concluída com sucesso!',
json_file
)
print(f"[{job_id}] JSON salvo em: {output_path}")
else:
update_processing_status(
job_id,
'error',
'Arquivo JSON não foi gerado'
)
print(f"[{job_id}] ERRO: JSON não encontrado: {json_file}")
else:
update_processing_status(
job_id,
'error',
f'Erro na transcrição: {result.stderr}'
)
print(f"[{job_id}] ERRO na execução do script")
except Exception as e:
update_processing_status(
job_id,
'error',
f'Erro interno: {str(e)}'
)
print(f"[{job_id}] EXCEPTION: {str(e)}")
finally:
# Limpar arquivo de áudio após processamento
try:
if os.path.exists(audio_path):
os.remove(audio_path)
except Exception as e:
print(f"[{job_id}] Erro ao limpar arquivo: {e}")
@app.route('/')
def index():
return render_template('dashboard.html')
@app.route('/transcricao')
def transcricao():
return render_template('index.html')
@app.route('/vsl')
def vsl():
return render_template('vsl.html')
@app.route('/upload', methods=['POST'])
def upload_file():
"""Upload de arquivo de áudio com validações de segurança"""
if 'file' not in request.files:
flash('Nenhum arquivo selecionado')
return redirect(request.url)
file = request.files['file']
is_valid, error = validate_file(file, ALLOWED_EXTENSIONS)
if not is_valid:
flash(error)
return redirect(request.url)
try:
# Sanitizar nome do arquivo
filename = sanitize_filename(secure_filename(file.filename))
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
filepath = secure_path(filepath)
# Verificar se o diretório de destino existe
os.makedirs(os.path.dirname(filepath), exist_ok=True)
file.save(filepath)
# Criar job ID único usando hash
job_id = f"{int(time.time())}_{hash(filename)}"
processing_status[job_id] = {
'status': 'queued',
'message': 'Aguardando processamento...',
'filename': filename
}
# Iniciar transcrição em background
thread = threading.Thread(target=run_transcription, args=(filepath, job_id))
thread.daemon = True
thread.start()
return render_template('processing.html', job_id=job_id, filename=filename)
except Exception as e:
flash(f'Erro ao processar arquivo: {str(e)}')
return redirect(url_for('index'))
@app.route('/status/')
def check_status(job_id):
"""Verifica status do processamento de forma thread-safe"""
with status_lock:
if job_id in processing_status:
return jsonify(processing_status[job_id])
return jsonify({
'status': 'error',
'message': 'Job não encontrado'
}), 404
@app.route('/download/')
def download_file(filename):
try:
file_path = os.path.join(OUTPUT_FOLDER, filename)
if os.path.exists(file_path):
return send_file(file_path, as_attachment=True)
else:
flash('Arquivo não encontrado')
return redirect(url_for('index'))
except Exception as e:
flash(f'Erro ao baixar arquivo: {str(e)}')
return redirect(url_for('index'))
@app.route('/results')
def results():
output_files = []
if os.path.exists(OUTPUT_FOLDER):
for filename in os.listdir(OUTPUT_FOLDER):
if filename.endswith('.json'):
filepath = os.path.join(OUTPUT_FOLDER, filename)
file_stats = os.stat(filepath)
output_files.append({
'name': filename,
'size': round(file_stats.st_size / 1024, 2),
'modified': time.ctime(file_stats.st_mtime)
})
return render_template('results.html', files=output_files)
@app.route('/list_transcriptions')
def list_transcriptions():
"""Lista todas as transcrições disponíveis no servidor"""
files = []
if os.path.exists(OUTPUT_FOLDER):
for filename in os.listdir(OUTPUT_FOLDER):
if filename.endswith('.json'):
file_path = os.path.join(OUTPUT_FOLDER, filename)
stats = os.stat(file_path)
files.append({
'name': filename,
'path': file_path,
'modified': stats.st_mtime * 1000 # Converter para milissegundos para JavaScript
})
return jsonify(files)
@app.route('/get_transcription/')
def get_transcription(filename):
"""Retorna o conteúdo de uma transcrição específica"""
try:
file_path = os.path.join(OUTPUT_FOLDER, filename)
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return jsonify(json.load(f))
else:
return jsonify({'error': 'Arquivo não encontrado'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/save_transcription', methods=['POST'])
def save_transcription():
"""Salva uma transcrição editada"""
try:
data = request.json
if not data or 'metadata' not in data or 'words' not in data:
return jsonify({'success': False, 'error': 'Formato de transcrição inválido'})
# Gerar nome do arquivo baseado no arquivo original
base_name = os.path.splitext(os.path.basename(data['metadata']['arquivo']))[0]
output_file = f"{base_name}_transcricao.json"
output_path = os.path.join(OUTPUT_FOLDER, output_file)
# Salvar arquivo
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=False, threaded=True)