dataset-tool / app /tasks /dataset_tasks.py
iaroy's picture
Deploy full application code
fdc5d7a
raw
history blame
3.11 kB
import logging
import time
import asyncio
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional, Tuple
from celery import Task, shared_task
from app.core.celery_app import get_celery_app
from app.services.hf_datasets import (
determine_impact_level_by_criteria,
get_hf_token,
get_dataset_size,
refresh_datasets_cache,
fetch_and_cache_all_datasets,
)
from app.services.redis_client import sync_cache_set, sync_cache_get, generate_cache_key
from app.core.config import settings
import requests
import os
# Configure logging
logger = logging.getLogger(__name__)
# Get Celery app instance
celery_app = get_celery_app()
# Constants
DATASET_CACHE_TTL = 60 * 60 * 24 * 30 # 30 days
BATCH_PROGRESS_CACHE_TTL = 60 * 60 * 24 * 7 # 7 days for batch progress
DATASET_SIZE_CACHE_TTL = 60 * 60 * 24 * 30 # 30 days for size info
@celery_app.task(name="app.tasks.dataset_tasks.refresh_hf_datasets_cache")
def refresh_hf_datasets_cache():
"""Celery task to refresh the HuggingFace datasets cache in Redis."""
logger.info("Starting refresh of HuggingFace datasets cache via Celery task.")
try:
refresh_datasets_cache()
logger.info("Successfully refreshed HuggingFace datasets cache.")
return {"status": "success"}
except Exception as e:
logger.error(f"Failed to refresh HuggingFace datasets cache: {e}")
return {"status": "error", "error": str(e)}
@shared_task(bind=True, max_retries=3, default_retry_delay=10)
def fetch_datasets_page(self, offset, limit):
"""
Celery task to fetch and cache a single page of datasets from Hugging Face.
Retries on failure.
"""
logger.info(f"[fetch_datasets_page] ENTRY: offset={offset}, limit={limit}")
try:
from app.services.hf_datasets import process_datasets_page
logger.info(f"[fetch_datasets_page] Calling process_datasets_page with offset={offset}, limit={limit}")
result = process_datasets_page(offset, limit)
logger.info(f"[fetch_datasets_page] SUCCESS: offset={offset}, limit={limit}, result={result}")
return result
except Exception as exc:
logger.error(f"[fetch_datasets_page] ERROR: offset={offset}, limit={limit}, exc={exc}", exc_info=True)
raise self.retry(exc=exc)
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def refresh_hf_datasets_full_cache(self):
logger.info("[refresh_hf_datasets_full_cache] Starting full Hugging Face datasets cache refresh.")
try:
token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")
if not token:
logger.error("[refresh_hf_datasets_full_cache] HUGGINGFACEHUB_API_TOKEN not set.")
return {"status": "error", "error": "HUGGINGFACEHUB_API_TOKEN not set"}
count = fetch_and_cache_all_datasets(token)
logger.info(f"[refresh_hf_datasets_full_cache] Cached {count} datasets.")
return {"status": "ok", "cached": count}
except Exception as exc:
logger.error(f"[refresh_hf_datasets_full_cache] ERROR: {exc}", exc_info=True)
raise self.retry(exc=exc)