Really-amin's picture
Upload 564 files
977a5ee verified
#!/usr/bin/env python3
"""
Models Adapter Service
======================
Adapter layer for ai_models.py integration
Provides standardized API for model predictions used by endpoints and DataResolver
"""
import importlib.util
import os
import logging
import json
import uuid
from typing import Dict, Any, Optional, List
from datetime import datetime, timezone
from pathlib import Path
logger = logging.getLogger(__name__)
# Path to ai_models.py - adjust if needed
AI_MODELS_PATH = os.getenv("AI_MODELS_PATH", "/mnt/data/ai_models.py")
# If running locally, try relative path
if not os.path.exists(AI_MODELS_PATH):
# Try relative path from project root
local_path = Path(__file__).parent.parent.parent / "ai_models.py"
if local_path.exists():
AI_MODELS_PATH = str(local_path)
logger.info(f"Using local ai_models.py at {AI_MODELS_PATH}")
# Token handling - read from environment with fallbacks
HF_TOKEN = os.getenv("hf-token") or os.getenv("HF_API_TOKEN") or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN")
if not HF_TOKEN:
logger.warning("HF token not set. Model calls will fail until hf-token or HF_API_TOKEN is provided.")
# Global reference to loaded ai_models module
_ai_models_module = None
_initialized = False
def _load_ai_models():
"""Dynamically load ai_models.py module"""
global _ai_models_module, _initialized
if _ai_models_module is not None:
return _ai_models_module
try:
spec = importlib.util.spec_from_file_location("ai_models", AI_MODELS_PATH)
if spec is None or spec.loader is None:
raise ImportError(f"Could not load spec from {AI_MODELS_PATH}")
_ai_models_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(_ai_models_module)
# Initialize models if function exists
if hasattr(_ai_models_module, "initialize_models"):
init_result = _ai_models_module.initialize_models()
logger.info(f"ai_models initialized: {init_result}")
_initialized = True
elif hasattr(_ai_models_module, "load_models"):
if HF_TOKEN:
_ai_models_module.load_models(hf_token=HF_TOKEN)
else:
_ai_models_module.load_models()
_initialized = True
logger.info(f"Successfully loaded ai_models from {AI_MODELS_PATH}")
return _ai_models_module
except Exception as e:
logger.error(f"Failed to load ai_models from {AI_MODELS_PATH}: {e}")
raise
def _get_ai_models():
"""Get ai_models module, loading if necessary"""
if _ai_models_module is None:
_load_ai_models()
return _ai_models_module
# Export for use in other modules
__all__ = [
"predict_with_model",
"persist_model_output",
"get_model_info",
"initialize_models",
"_get_ai_models" # Export for endpoint use
]
def _normalize_model_output(
raw_output: Dict[str, Any],
model_key: str,
symbol: Optional[str] = None,
prediction_type: str = "sentiment"
) -> Dict[str, Any]:
"""
Normalize ai_models output to standard contract format
Standard format:
{
"id": "<uuid>",
"symbol": "BTC" | null,
"type": "sentiment|summary|impute|signal|estimate",
"score": 0.87,
"model": "model_key",
"explain": { "reason": "...", "features": {...} },
"data": { /* domain-specific payload */ },
"meta": {
"source": "hf-model",
"generated_at": "ISO",
"confidence": 0.87,
"cache_ttl_seconds": 30,
"attempted": ["hf-model"]
}
}
"""
output_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
# Extract score/confidence from raw output
score = 0.5
confidence = 0.5
if isinstance(raw_output, dict):
# Try different score fields
score = raw_output.get("score", raw_output.get("confidence", raw_output.get("vote", 0.5)))
confidence = raw_output.get("confidence", raw_output.get("score", abs(score)))
# Extract label if present
label = raw_output.get("label", raw_output.get("sentiment", "neutral"))
# Build explain block
explain = {
"reason": raw_output.get("reasoning", raw_output.get("analysis", "")),
"features": raw_output.get("scores", raw_output.get("details", {}))
}
# Build data payload
data = {
"label": label,
"score": score,
"confidence": confidence,
**{k: v for k, v in raw_output.items() if k not in ["score", "confidence", "label", "sentiment", "reasoning", "analysis", "scores", "details"]}
}
else:
# If output is not dict, wrap it
data = {"result": raw_output}
explain = {"reason": "Model output", "features": {}}
return {
"id": output_id,
"symbol": symbol,
"type": prediction_type,
"score": float(score),
"model": model_key,
"explain": explain,
"data": data,
"meta": {
"source": "hf-model",
"generated_at": now,
"confidence": float(confidence),
"cache_ttl_seconds": 30,
"attempted": ["hf-model"]
}
}
async def predict_with_model(
model_key: str,
input_payload: Dict[str, Any],
symbol: Optional[str] = None
) -> Dict[str, Any]:
"""
Standardized call used by endpoints and DataResolver
Args:
model_key: string name of model/pipeline in ai_models
input_payload: dict with required fields (text, mode, etc.)
symbol: optional symbol for context
Returns:
Normalized dict: { id, model, score, data, explain, meta }
"""
try:
ai_models = _get_ai_models()
# Extract text from payload
text = input_payload.get("text", input_payload.get("context", ""))
if not text:
raise ValueError("text or context required in input_payload")
# Determine prediction type from model_key
prediction_type = "sentiment"
if "summar" in model_key.lower():
prediction_type = "summary"
elif "signal" in model_key.lower() or "trade" in model_key.lower():
prediction_type = "signal"
elif "impute" in model_key.lower() or "gap" in model_key.lower():
prediction_type = "impute"
elif "correction" in model_key.lower():
prediction_type = "correction"
# Map model_key to ai_models function
result = None
# Try direct model call first
if hasattr(ai_models, "call_model_safe"):
result = ai_models.call_model_safe(model_key, text, **input_payload.get("params", {}))
if result.get("status") == "success":
raw_output = result.get("data", {})
elif result.get("status") == "unavailable":
logger.warning(f"Model {model_key} unavailable: {result.get('error')}")
raise RuntimeError(f"Model unavailable: {result.get('error')}")
else:
raise RuntimeError(f"Model call failed: {result.get('error')}")
# Fallback to specific functions based on model_key
elif "sentiment" in model_key.lower() or "sent" in model_key.lower():
mode = input_payload.get("mode", "crypto")
if mode == "crypto":
result_dict = ai_models.ensemble_crypto_sentiment(text)
elif mode == "financial":
result_dict = ai_models.analyze_financial_sentiment(text)
elif mode == "social":
result_dict = ai_models.analyze_social_sentiment(text)
else:
result_dict = ai_models.ensemble_crypto_sentiment(text)
raw_output = result_dict
elif "summar" in model_key.lower():
# Use summarization model if available
if hasattr(ai_models, "call_model_safe"):
result = ai_models.call_model_safe("summarization_0", text)
if result.get("status") == "success":
raw_output = result.get("data", {})
else:
raise RuntimeError(f"Summarization failed: {result.get('error')}")
else:
# Fallback
raw_output = {"summary": text[:200], "confidence": 0.6}
else:
# Generic model call
if hasattr(ai_models, "call_model_safe"):
result = ai_models.call_model_safe(model_key, text, **input_payload.get("params", {}))
if result.get("status") == "success":
raw_output = result.get("data", {})
else:
raise RuntimeError(f"Model call failed: {result.get('error')}")
else:
raise ValueError(f"Unknown model_key: {model_key} and no call_model_safe available")
# Normalize output
normalized = _normalize_model_output(raw_output, model_key, symbol, prediction_type)
# Persist to database
await persist_model_output(normalized)
return normalized
except Exception as e:
logger.error(f"Error in predict_with_model for {model_key}: {e}", exc_info=True)
raise
async def persist_model_output(output: Dict[str, Any]) -> bool:
"""
Persist model output to database
Args:
output: Normalized model output dict
Returns:
bool: Success status
"""
try:
from database.models import ModelOutput
from database.db_manager import db_manager
import json
# Extract fields
model_output = ModelOutput(
id=output.get("id", str(uuid.uuid4())),
model_key=output.get("model", "unknown"),
symbol=output.get("symbol", "N/A"),
type=output.get("type", "unknown"),
score=output.get("score", 0.0),
data_json=json.dumps(output.get("data", {})),
meta_json=json.dumps(output.get("meta", {}))
)
# Save to database
with db_manager.get_session() as session:
session.add(model_output)
session.commit()
logger.debug(f"Persisted model output: {output.get('model')} for {output.get('symbol')}")
return True
except Exception as e:
logger.error(f"Error persisting model output: {e}", exc_info=True)
return False
def get_model_info() -> Dict[str, Any]:
"""Get information about available models"""
try:
ai_models = _get_ai_models()
if hasattr(ai_models, "get_model_info"):
return ai_models.get_model_info()
elif hasattr(ai_models, "registry_status"):
status = ai_models.registry_status()
return {
"transformers_available": status.get("transformers_available", False),
"models_initialized": status.get("initialized", False),
"models_loaded": status.get("pipelines_loaded", 0),
"hf_mode": status.get("hf_mode", "off")
}
else:
return {
"status": "loaded",
"initialized": _initialized
}
except Exception as e:
logger.error(f"Error getting model info: {e}")
return {
"status": "error",
"error": str(e)
}
def initialize_models() -> Dict[str, Any]:
"""Initialize models from ai_models"""
try:
ai_models = _get_ai_models()
if hasattr(ai_models, "initialize_models"):
return ai_models.initialize_models()
else:
return {
"status": "already_loaded",
"initialized": _initialized
}
except Exception as e:
logger.error(f"Error initializing models: {e}")
return {
"status": "error",
"error": str(e)
}