peace2024 commited on
Commit
e0993a0
·
1 Parent(s): dc871e9

adding worker daemon

Browse files
Files changed (2) hide show
  1. Dockerfile +4 -1
  2. worker/daemon.py +210 -0
Dockerfile CHANGED
@@ -53,9 +53,12 @@ RUN mkdir -p vector_store logs
53
  # Expose port 7860 (used by Hugging Face Spaces)
54
  EXPOSE 7860
55
 
 
 
 
56
  # Health check
57
  HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
58
  CMD curl -f http://localhost:7860/docs || exit 1
59
 
60
  # Run the FastAPI app via Uvicorn
61
- CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "7860"]
 
53
  # Expose port 7860 (used by Hugging Face Spaces)
54
  EXPOSE 7860
55
 
56
+ # Install PyTorch with CUDA support (will fall back to CPU if no GPU)
57
+ RUN pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
58
+
59
  # Health check
60
  HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
61
  CMD curl -f http://localhost:7860/docs || exit 1
62
 
63
  # Run the FastAPI app via Uvicorn
64
+ CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "7860", "--workers", "1"]
worker/daemon.py ADDED
@@ -0,0 +1,210 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import os
3
+ import time
4
+ import signal
5
+ import sys
6
+ from datetime import datetime
7
+ import traceback
8
+ import logging
9
+
10
+ from sqlalchemy.future import select
11
+ from sqlalchemy.ext.asyncio import AsyncSession
12
+ from sqlalchemy.exc import SQLAlchemyError
13
+
14
+ from app.database import AsyncSessionLocal, init_db, close_db
15
+ from app.models import VideoUpload
16
+ from app.utils import whisper_llm, pdf, s3, lightweight_agentic
17
+
18
+ # Setup logging with UTF-8 encoding for Windows compatibility
19
+ logging.basicConfig(
20
+ level=logging.INFO,
21
+ format='[%(asctime)s] %(levelname)s - %(name)s - %(message)s',
22
+ handlers=[
23
+ logging.StreamHandler(sys.stdout), # Use stdout for better encoding
24
+ logging.FileHandler('worker.log', encoding='utf-8')
25
+ ]
26
+ )
27
+ logger = logging.getLogger("worker.daemon")
28
+
29
+ POLL_INTERVAL = 60 # seconds
30
+ SHUTDOWN_EVENT = asyncio.Event()
31
+
32
+
33
+ def signal_handler(signum, frame):
34
+ """Handle shutdown signals gracefully"""
35
+ logger.info(f"Received signal {signum}, initiating graceful shutdown...")
36
+ SHUTDOWN_EVENT.set()
37
+
38
+
39
+ async def process_pending_videos():
40
+ """Process all pending video uploads"""
41
+ async with AsyncSessionLocal() as session:
42
+ try:
43
+ # Query for pending videos
44
+ result = await session.execute(
45
+ select(VideoUpload).where(VideoUpload.status == "pending")
46
+ )
47
+ pending_videos = result.scalars().all()
48
+
49
+ if not pending_videos:
50
+ logger.info("No pending videos found")
51
+ return
52
+
53
+ logger.info(f"Found {len(pending_videos)} pending videos to process")
54
+
55
+ for video in pending_videos:
56
+ if SHUTDOWN_EVENT.is_set():
57
+ logger.info("Shutdown requested, stopping video processing")
58
+ break
59
+
60
+ logger.info(f"Processing video ID {video.id} for user {video.user_id}")
61
+
62
+ try:
63
+ # Update status to processing
64
+ video.status = "processing"
65
+ video.updated_at = datetime.utcnow()
66
+ await session.commit()
67
+
68
+ # Process with Lightweight Agentic Analysis (Groq + Llama3)
69
+ try:
70
+ transcription, summary = await lightweight_agentic.analyze_with_lightweight_agentic(
71
+ video_url=video.video_url,
72
+ user_id=video.user_id,
73
+ db=session
74
+ )
75
+ logger.info(f"Lightweight agentic analysis completed for video {video.id}")
76
+ except Exception as agentic_error:
77
+ logger.warning(f"Lightweight agentic analysis failed, falling back to basic Whisper: {agentic_error}")
78
+ transcription, summary = await whisper_llm.analyze(
79
+ video_url=video.video_url,
80
+ user_id=video.user_id,
81
+ db=session
82
+ )
83
+ logger.info(f"Basic Whisper analysis completed for video {video.id}")
84
+
85
+ except Exception as e:
86
+ logger.error(f"Whisper failed for video {video.id}: {e}")
87
+ logger.debug(traceback.format_exc())
88
+
89
+ # Update status to failed
90
+ video.status = "failed"
91
+ video.updated_at = datetime.utcnow()
92
+ await session.commit()
93
+ continue
94
+
95
+ try:
96
+ # Generate PDF
97
+ pdf_bytes = pdf.generate(transcription, summary)
98
+ logger.info(f"PDF generation completed for video {video.id}")
99
+ except Exception as e:
100
+ logger.error(f"PDF generation failed for video {video.id}: {e}")
101
+ logger.debug(traceback.format_exc())
102
+
103
+ video.status = "failed"
104
+ video.updated_at = datetime.utcnow()
105
+ await session.commit()
106
+ continue
107
+
108
+ try:
109
+ # Upload to S3
110
+ pdf_key = f"pdfs/{video.id}.pdf"
111
+ pdf_url = s3.upload_pdf_bytes(pdf_bytes, pdf_key)
112
+ logger.info(f"S3 upload completed for video {video.id}")
113
+ except Exception as e:
114
+ logger.error(f"Upload to S3 failed for video {video.id}: {e}")
115
+ logger.debug(traceback.format_exc())
116
+
117
+ video.status = "failed"
118
+ video.updated_at = datetime.utcnow()
119
+ await session.commit()
120
+ continue
121
+
122
+ try:
123
+ # Mark as completed
124
+ video.status = "completed"
125
+ video.pdf_url = pdf_url
126
+ video.updated_at = datetime.utcnow()
127
+ await session.commit()
128
+ logger.info(f"Successfully completed video {video.id}")
129
+
130
+ except SQLAlchemyError as e:
131
+ logger.error(f"DB commit failed for video {video.id}: {e}")
132
+ logger.debug(traceback.format_exc())
133
+ await session.rollback()
134
+
135
+ except SQLAlchemyError as e:
136
+ logger.error(f"Database error: {e}")
137
+ logger.debug(traceback.format_exc())
138
+ except Exception as e:
139
+ logger.error(f"Unexpected error in process_pending_videos: {e}")
140
+ logger.debug(traceback.format_exc())
141
+
142
+
143
+ async def run_worker():
144
+ """Main worker loop"""
145
+ logger.info("Async worker daemon started...")
146
+
147
+ # Initialize database
148
+ try:
149
+ await init_db()
150
+ logger.info("Database initialized successfully")
151
+ except Exception as e:
152
+ logger.error(f"Failed to initialize database: {e}")
153
+ return
154
+
155
+ cycle_count = 0
156
+ while not SHUTDOWN_EVENT.is_set():
157
+ cycle_count += 1
158
+ logger.info(f"Worker cycle {cycle_count} - Checking for pending videos...")
159
+
160
+ try:
161
+ await process_pending_videos()
162
+ except Exception as e:
163
+ logger.error(f"Worker loop error: {e}")
164
+ logger.debug(traceback.format_exc())
165
+
166
+ # Wait for next cycle or shutdown
167
+ try:
168
+ await asyncio.wait_for(SHUTDOWN_EVENT.wait(), timeout=POLL_INTERVAL)
169
+ except asyncio.TimeoutError:
170
+ # Normal timeout, continue to next cycle
171
+ pass
172
+ except Exception as e:
173
+ logger.error(f"Error in worker wait: {e}")
174
+ break
175
+
176
+ logger.info("Worker loop stopped, cleaning up...")
177
+
178
+ # Cleanup
179
+ try:
180
+ await close_db()
181
+ logger.info("Database connections closed")
182
+ except Exception as e:
183
+ logger.error(f"Error during cleanup: {e}")
184
+
185
+
186
+ async def main():
187
+ """Main entry point with signal handling"""
188
+ # Setup signal handlers
189
+ signal.signal(signal.SIGINT, signal_handler)
190
+ signal.signal(signal.SIGTERM, signal_handler)
191
+
192
+ try:
193
+ await run_worker()
194
+ except KeyboardInterrupt:
195
+ logger.info("Keyboard interrupt received")
196
+ except Exception as e:
197
+ logger.error(f"Fatal error in main: {e}")
198
+ logger.debug(traceback.format_exc())
199
+ finally:
200
+ logger.info("Worker daemon shutdown complete")
201
+
202
+
203
+ if __name__ == "__main__":
204
+ try:
205
+ asyncio.run(main())
206
+ except KeyboardInterrupt:
207
+ logger.info("Worker daemon interrupted by user")
208
+ except Exception as e:
209
+ logger.error(f"Fatal error: {e}")
210
+ sys.exit(1)