Spaces:
Paused
Paused
import os | |
import json | |
import logging | |
from typing import Any, Optional, Dict | |
from datetime import datetime, timedelta | |
logger = logging.getLogger(__name__) | |
class CacheService: | |
""" | |
Cache service with Redis backend and in-memory fallback | |
""" | |
def __init__(self): | |
self.redis_client = None | |
self.in_memory_cache = {} | |
self.cache_ttl = {} # For in-memory TTL tracking | |
self._initialize_redis() | |
def _initialize_redis(self): | |
"""Initialize Redis connection with proper Docker networking""" | |
try: | |
import redis | |
# Try different Redis connection methods | |
redis_configs = [ | |
# Docker Compose service name | |
{"host": "redis", "port": 6379, "db": 0}, | |
# Environment variable | |
{"host": os.getenv("REDIS_HOST", "redis"), "port": int(os.getenv("REDIS_PORT", 6379)), "db": 0}, | |
# URL-based connection | |
{"url": os.getenv("REDIS_URL", "redis://redis:6379/0")}, | |
# Localhost fallback | |
{"host": "localhost", "port": 6379, "db": 0} | |
] | |
for config in redis_configs: | |
try: | |
if "url" in config: | |
self.redis_client = redis.from_url(config["url"], socket_timeout=5, socket_connect_timeout=5) | |
else: | |
self.redis_client = redis.Redis(**config, socket_timeout=5, socket_connect_timeout=5) | |
# Test connection | |
self.redis_client.ping() | |
logger.info(f"✅ Redis connected: {config}") | |
return | |
except Exception as e: | |
logger.debug(f"Redis connection failed with config {config}: {e}") | |
continue | |
logger.warning("⚠️ Redis connection failed. Using in-memory fallback cache.") | |
self.redis_client = None | |
except ImportError: | |
logger.warning("⚠️ Redis not installed. Using in-memory fallback cache.") | |
self.redis_client = None | |
except Exception as e: | |
logger.warning(f"⚠️ Redis initialization failed: {e}. Using in-memory fallback.") | |
self.redis_client = None | |
def _is_expired(self, key: str) -> bool: | |
"""Check if in-memory cache key is expired""" | |
if key not in self.cache_ttl: | |
return False | |
return datetime.now() > self.cache_ttl[key] | |
def _clean_expired(self): | |
"""Clean expired in-memory cache entries""" | |
now = datetime.now() | |
expired_keys = [k for k, expiry in self.cache_ttl.items() if now > expiry] | |
for key in expired_keys: | |
self.in_memory_cache.pop(key, None) | |
self.cache_ttl.pop(key, None) | |
async def get(self, key: str) -> Optional[Any]: | |
"""Get value from cache""" | |
try: | |
if self.redis_client: | |
# Try Redis first | |
try: | |
value = self.redis_client.get(key) | |
if value: | |
return json.loads(value.decode('utf-8')) | |
return None | |
except Exception as e: | |
logger.warning(f"Redis get failed for key {key}: {e}") | |
# Fall through to in-memory cache | |
# In-memory fallback | |
self._clean_expired() | |
if key in self.in_memory_cache and not self._is_expired(key): | |
return self.in_memory_cache[key] | |
return None | |
except Exception as e: | |
logger.error(f"Cache get error for key {key}: {e}") | |
return None | |
async def set(self, key: str, value: Any, ttl: int = 3600) -> bool: | |
"""Set value in cache with TTL (seconds)""" | |
try: | |
serialized_value = json.dumps(value, default=str) | |
if self.redis_client: | |
# Try Redis first | |
try: | |
result = self.redis_client.setex(key, ttl, serialized_value) | |
return bool(result) | |
except Exception as e: | |
logger.warning(f"Redis set failed for key {key}: {e}") | |
# Fall through to in-memory cache | |
# In-memory fallback | |
self.in_memory_cache[key] = value | |
self.cache_ttl[key] = datetime.now() + timedelta(seconds=ttl) | |
return True | |
except Exception as e: | |
logger.error(f"Cache set error for key {key}: {e}") | |
return False | |
async def delete(self, key: str) -> bool: | |
"""Delete key from cache""" | |
try: | |
deleted = False | |
if self.redis_client: | |
try: | |
result = self.redis_client.delete(key) | |
deleted = bool(result) | |
except Exception as e: | |
logger.warning(f"Redis delete failed for key {key}: {e}") | |
# In-memory cleanup | |
if key in self.in_memory_cache: | |
del self.in_memory_cache[key] | |
deleted = True | |
if key in self.cache_ttl: | |
del self.cache_ttl[key] | |
return deleted | |
except Exception as e: | |
logger.error(f"Cache delete error for key {key}: {e}") | |
return False | |
async def exists(self, key: str) -> bool: | |
"""Check if key exists in cache""" | |
try: | |
if self.redis_client: | |
try: | |
return bool(self.redis_client.exists(key)) | |
except Exception as e: | |
logger.warning(f"Redis exists failed for key {key}: {e}") | |
# In-memory check | |
self._clean_expired() | |
return key in self.in_memory_cache and not self._is_expired(key) | |
except Exception as e: | |
logger.error(f"Cache exists error for key {key}: {e}") | |
return False | |
async def clear(self) -> bool: | |
"""Clear all cache""" | |
try: | |
if self.redis_client: | |
try: | |
self.redis_client.flushdb() | |
except Exception as e: | |
logger.warning(f"Redis clear failed: {e}") | |
# Clear in-memory cache | |
self.in_memory_cache.clear() | |
self.cache_ttl.clear() | |
return True | |
except Exception as e: | |
logger.error(f"Cache clear error: {e}") | |
return False | |
async def get_stats(self) -> Dict[str, Any]: | |
"""Get cache statistics""" | |
stats = { | |
"backend": "redis" if self.redis_client else "in_memory", | |
"in_memory_keys": len(self.in_memory_cache), | |
} | |
if self.redis_client: | |
try: | |
info = self.redis_client.info() | |
stats.update({ | |
"redis_connected": True, | |
"redis_used_memory": info.get("used_memory_human", "N/A"), | |
"redis_keys": info.get("db0", {}).get("keys", 0) if "db0" in info else 0, | |
}) | |
except Exception as e: | |
stats.update({ | |
"redis_connected": False, | |
"redis_error": str(e) | |
}) | |
else: | |
stats["redis_connected"] = False | |
return stats | |
def get_redis_health(self) -> Dict[str, Any]: | |
"""Get Redis health status""" | |
if not self.redis_client: | |
return {"status": "unavailable", "message": "Redis client not initialized"} | |
try: | |
self.redis_client.ping() | |
return {"status": "healthy", "message": "Redis connection OK"} | |
except Exception as e: | |
return {"status": "unhealthy", "message": f"Redis ping failed: {e}"} | |
# Global cache service instance | |
cache_service = CacheService() | |
# Convenience functions | |
async def get_cache(key: str) -> Optional[Any]: | |
"""Get value from cache""" | |
return await cache_service.get(key) | |
async def set_cache(key: str, value: Any, ttl: int = 3600) -> bool: | |
"""Set value in cache""" | |
return await cache_service.set(key, value, ttl) | |
async def delete_cache(key: str) -> bool: | |
"""Delete key from cache""" | |
return await cache_service.delete(key) | |
async def clear_cache() -> bool: | |
"""Clear all cache""" | |
return await cache_service.clear() | |
async def cache_exists(key: str) -> bool: | |
"""Check if key exists in cache""" | |
return await cache_service.exists(key) |