import asyncio import os import time from datetime import datetime from sqlalchemy.future import select from sqlalchemy.ext.asyncio import AsyncSession from app.database import AsyncSessionLocal from app.models import VideoUpload from app.utils import whisper_llm, pdf, s3 POLL_INTERVAL = 200 # seconds async def process_pending_videos(): async with AsyncSessionLocal() as session: try: result = await session.execute( select(VideoUpload).where(VideoUpload.status == "pending") ) pending_videos = result.scalars().all() for video in pending_videos: print(f"🎬 Processing video ID {video.id} for user {video.user_id}") try: transcription, summary = whisper_llm.analyze(video.video_url) except Exception as e: print(f"❌ Whisper failed: {e}") continue try: pdf_bytes = pdf.generate(transcription, summary) except Exception as e: print(f"❌ PDF generation failed: {e}") continue try: pdf_key = f"pdfs/{video.id}.pdf" pdf_url = s3.upload_pdf_bytes(pdf_bytes, pdf_key) except Exception as e: print(f"❌ Upload to S3 failed: {e}") continue video.status = "completed" video.pdf_url = pdf_url video.updated_at = datetime.utcnow() await session.commit() print(f"✅ Completed video {video.id}") except Exception as e: print(f"❌ DB error: {e}") async def run_worker(): print("🚀 Async worker started (Neon)...") while True: print("🔁 Checking for pending videos...") try: await process_pending_videos() except Exception as e: print(f"❌ Worker loop crashed: {e}") await asyncio.sleep(POLL_INTERVAL) if __name__ == "__main__": asyncio.run(run_worker())