import asyncio import os import time from datetime import datetime import traceback from sqlalchemy.future import select from sqlalchemy.ext.asyncio import AsyncSession from app.database import AsyncSessionLocal from app.models import VideoUpload from app.utils import whisper_llm, pdf, s3 POLL_INTERVAL = 200 # seconds async def process_pending_videos(): async with AsyncSessionLocal() as session: try: result = await session.execute( select(VideoUpload).where(VideoUpload.status == "pending") ) pending_videos = result.scalars().all() for video in pending_videos: print(f"🎬 Processing video ID {video.id} for user {video.user_id}") try: # ✅ New: transcription, summary = await whisper_llm.analyze( video_url=video.video_url, user_id=video.user_id, db=session # passing the active AsyncSession ) except Exception as e: print(f"❌ Whisper failed for video {video.id}: {e}") traceback.print_exc() continue try: pdf_bytes = pdf.generate(transcription, summary) except Exception as e: print(f"❌ PDF generation failed for video {video.id}: {e}") traceback.print_exc() continue try: pdf_key = f"pdfs/{video.id}.pdf" pdf_url = s3.upload_pdf_bytes(pdf_bytes, pdf_key) except Exception as e: print(f"❌ Upload to S3 failed for video {video.id}: {e}") traceback.print_exc() continue try: video.status = "completed" video.pdf_url = pdf_url video.updated_at = datetime.utcnow() await session.commit() print(f"✅ Completed video {video.id}") except Exception as e: print(f"❌ DB commit failed for video {video.id}: {e}") traceback.print_exc() except Exception as e: print(f"❌ DB error: {e}") traceback.print_exc() async def run_worker(): print("🚀 Async worker started (Neon)...") while True: print("🔁 Checking for pending videos...") try: await process_pending_videos() except Exception as e: print(f"❌ Worker loop crashed: {e}") traceback.print_exc() await asyncio.sleep(POLL_INTERVAL) if __name__ == "__main__": asyncio.run(run_worker())