Datasourceforcryptocurrency-2 / workers /ohlc_worker_enhanced.py
Really-amin's picture
Upload 709 files
7152e8a verified
#!/usr/bin/env python3
"""
ohlc_worker_enhanced.py
Enhanced OHLC worker that uses provider registry files to discover and fetch from multiple providers.
- Loads provider registries:
providers_registered.json
providers_config_extended.json
crypto_resources_unified_2025-11-11.json
WEBSOCKET_URL_FIX.json
- Discovers providers that expose OHLC/candles/klines endpoints
- Fetches OHLC data via REST (prefers REST, minimal WebSocket usage)
- Stores normalized rows into SQLite DB: data/crypto_monitor.db
- Produces summary log and JSON output
Usage:
python workers/ohlc_worker_enhanced.py --once --symbols BTC-USDT,ETH-USDT --timeframe 1h
or run in loop:
python workers/ohlc_worker_enhanced.py --loop --interval 300
"""
import os
import json
import time
import argparse
import logging
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from pathlib import Path
import sys
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
# httpx for HTTP requests
try:
import httpx
except ImportError:
print("ERROR: httpx not installed. Run: pip install httpx")
sys.exit(1)
# sqlite via sqlalchemy for convenience
try:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, DateTime, text
from sqlalchemy.exc import OperationalError
except ImportError:
print("ERROR: sqlalchemy not installed. Run: pip install sqlalchemy")
sys.exit(1)
# Determine base directory (repo root)
BASE_DIR = Path(__file__).parent.parent.resolve()
# Config / paths (override via env, default to local structure)
PROVIDERS_REGISTERED = os.environ.get("OHLC_PROVIDERS_REGISTERED", str(BASE_DIR / "data" / "providers_registered.json"))
PROVIDERS_CONFIG = os.environ.get("OHLC_PROVIDERS_CONFIG", str(BASE_DIR / "app" / "providers_config_extended.json"))
CRYPTO_RESOURCES = os.environ.get("OHLC_CRYPTO_RESOURCES", str(BASE_DIR / "api-resources" / "crypto_resources_unified_2025-11-11.json"))
WEBSOCKET_FIX = os.environ.get("OHLC_WEBSOCKET_FIX", str(BASE_DIR / "WEBSOCKET_URL_FIX.json"))
EXCHANGE_ENDPOINTS = os.environ.get("OHLC_EXCHANGE_ENDPOINTS", str(BASE_DIR / "data" / "exchange_ohlc_endpoints.json"))
DB_PATH = os.environ.get("OHLC_DB", str(BASE_DIR / "data" / "crypto_monitor.db"))
LOG_PATH = os.environ.get("OHLC_LOG", str(BASE_DIR / "tmp" / "ohlc_worker_enhanced.log"))
SUMMARY_PATH = os.environ.get("OHLC_SUMMARY", str(BASE_DIR / "tmp" / "ohlc_worker_enhanced_summary.json"))
# Ensure tmp directory exists
Path(LOG_PATH).parent.mkdir(parents=True, exist_ok=True)
# Defaults
DEFAULT_TIMEFRAME = "1h"
DEFAULT_LIMIT = 200 # number of candles to request per fetch when supported
USER_AGENT = "ohlc_worker_enhanced/1.0"
# Logging
logging.basicConfig(
filename=LOG_PATH,
filemode="a",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("ohlc_worker_enhanced")
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)
# Utility: read json safely
def read_json(path: str) -> Any:
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
logger.warning(f"File not found: {path}")
return None
except json.JSONDecodeError as e:
logger.warning(f"JSON decode error for {path}: {e}")
return None
except Exception as e:
logger.warning(f"read_json failed for {path}: {e}")
return None
# Normalize provider records: try to extract REST OHLC endpoints
def extract_ohlc_endpoints(provider_record: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
provider_record: dict that may contain keys:
- base_url, endpoints, api_endpoints, ohlc, candles, kline
Return list of endpoint dicts: {url_template, method, params_spec}
url_template may include placeholders {symbol} {timeframe} {limit} etc.
"""
endpoints = []
# common places
candidates = []
if isinstance(provider_record, dict):
for key in ("endpoints", "api_endpoints", "endpoints_list"):
if key in provider_record and isinstance(provider_record[key], list):
candidates.extend(provider_record[key])
elif key in provider_record and isinstance(provider_record[key], dict):
# Sometimes endpoints is a dict with keys for different endpoint types
for ep_key, ep_val in provider_record[key].items():
if isinstance(ep_val, (str, dict)):
candidates.append(ep_val)
# sometimes endpoints are top-level keys
for pkey in ("ohlc", "candles", "kline", "klines", "ticker"):
if pkey in provider_record:
candidates.append(provider_record[pkey])
# fallback: base_url + known path templates
base = provider_record.get("base_url") or provider_record.get("url") or provider_record.get("api")
if base:
# Check if base URL suggests it's a known exchange
base_lower = base.lower()
if "binance" in base_lower:
# Binance-style endpoints
common_templates = [
{"url": base.rstrip("/") + "/api/v3/klines?symbol={symbol}&interval={timeframe}&limit={limit}", "method": "GET"},
{"url": base.rstrip("/") + "/api/v1/klines?symbol={symbol}&interval={timeframe}&limit={limit}", "method": "GET"},
]
candidates.extend(common_templates)
elif any(x in base_lower for x in ["kraken", "coinbase", "okx", "huobi", "bybit"]):
# Generic exchange templates - these are just candidates, actual usage will depend on testing
common_templates = [
{"url": base.rstrip("/") + "/api/v1/klines?symbol={symbol}&interval={timeframe}&limit={limit}", "method": "GET"},
{"url": base.rstrip("/") + "/v1/market/candles?market={symbol}&period={timeframe}&limit={limit}", "method": "GET"},
]
candidates.extend(common_templates)
# normalize candidate entries
for c in candidates:
if isinstance(c, str):
# Check if string looks like it contains OHLC-related keywords
if any(kw in c.lower() for kw in ["kline", "candle", "ohlc", "chart", "history"]):
endpoints.append({"url": c, "method": "GET"})
elif isinstance(c, dict):
# prefer url, template, path
url = c.get("url") or c.get("template") or c.get("path") or c.get("endpoint")
if url and isinstance(url, str):
# Check if URL looks OHLC-related
if any(kw in url.lower() for kw in ["kline", "candle", "ohlc", "chart", "history"]):
endpoints.append({"url": url, "method": c.get("method", "GET"), "meta": c.get("meta")})
else:
# sometimes entries are nested - store the whole dict as meta
endpoints.append({"url": json.dumps(c), "method": "GET", "meta": c})
return endpoints
# Guess if an endpoint template can produce REST OHLC for given symbol/timeframe
def render_template(url_template: str, symbol: str, timeframe: str, limit: int) -> str:
try:
return url_template.format(symbol=symbol, timeframe=timeframe, limit=limit)
except KeyError:
# try simple replacements for common placeholders
u = url_template.replace("{symbol}", symbol)
u = u.replace("{timeframe}", timeframe).replace("{interval}", timeframe)
u = u.replace("{limit}", str(limit))
# Also try {market} as alias for symbol
u = u.replace("{market}", symbol).replace("{pair}", symbol)
return u
except Exception:
return url_template
# Minimal parser for common exchange interval names
def normalize_timeframe(tf: str) -> str:
# map friendly timeframes to common exchange-style intervals
mapping = {
"1m": "1m", "5m": "5m", "15m": "15m", "30m": "30m",
"1h": "1h", "4h": "4h", "1d": "1d", "1w": "1w"
}
return mapping.get(tf, tf)
# DB: ensures table exists and inserts normalized candle rows
def ensure_db_and_table(engine):
meta = MetaData()
ohlc = Table(
"ohlc_data", meta,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("provider", String(128), index=True),
Column("symbol", String(64), index=True),
Column("timeframe", String(16), index=True),
Column("ts", DateTime, index=True), # candle start time
Column("open", Float),
Column("high", Float),
Column("low", Float),
Column("close", Float),
Column("volume", Float),
Column("raw", String), # raw JSON as string if needed
extend_existing=True
)
meta.create_all(engine)
return ohlc
# Normalize provider list from the three JSON files
def build_providers_list() -> List[Dict[str, Any]]:
# Priority: providers_config_extended.json -> providers_registered.json -> providers from crypto_resources
providers = []
# Read all config files
cfg = read_json(PROVIDERS_CONFIG) or {}
reg = read_json(PROVIDERS_REGISTERED) or {}
resources = read_json(CRYPTO_RESOURCES) or {}
ws_map = read_json(WEBSOCKET_FIX) or {}
logger.info(f"Loading providers from config files:")
logger.info(f" - {PROVIDERS_CONFIG}: {'OK' if cfg else 'SKIP'}")
logger.info(f" - {PROVIDERS_REGISTERED}: {'OK' if reg else 'SKIP'}")
logger.info(f" - {CRYPTO_RESOURCES}: {'OK' if resources else 'SKIP'}")
logger.info(f" - {WEBSOCKET_FIX}: {'OK' if ws_map else 'SKIP'}")
# Also load exchange endpoints config
exchange_cfg = read_json(EXCHANGE_ENDPOINTS) or {}
logger.info(f" - {EXCHANGE_ENDPOINTS}: {'OK' if exchange_cfg else 'SKIP'}")
# providers from config
for p in (cfg.get("providers") if isinstance(cfg, dict) else cfg if isinstance(cfg, list) else []) or []:
if isinstance(p, dict):
p["_source_file"] = PROVIDERS_CONFIG
p["_ws_url"] = ws_map.get(p.get("name")) or ws_map.get(p.get("base_url"))
providers.append(p)
# registered providers
if isinstance(reg, list):
for p in reg:
if isinstance(p, dict):
p["_source_file"] = PROVIDERS_REGISTERED
p["_ws_url"] = ws_map.get(p.get("name")) or ws_map.get(p.get("base_url"))
providers.append(p)
elif isinstance(reg, dict):
for p in (reg.get("providers") or []):
if isinstance(p, dict):
p["_source_file"] = PROVIDERS_REGISTERED
p["_ws_url"] = ws_map.get(p.get("name")) or ws_map.get(p.get("base_url"))
providers.append(p)
# also scan resources (some resources include provider endpoints)
if isinstance(resources, dict):
# resources may contain provider entries or endpoints
entries = resources.get("providers") or resources.get("exchanges") or []
for p in entries:
if isinstance(p, dict):
p["_source_file"] = CRYPTO_RESOURCES
p["_ws_url"] = ws_map.get(p.get("name")) or ws_map.get(p.get("base_url"))
providers.append(p)
# Load exchange endpoints (highest priority - these are known working endpoints)
if isinstance(exchange_cfg, dict):
for p in (exchange_cfg.get("providers") or []):
if isinstance(p, dict):
p["_source_file"] = EXCHANGE_ENDPOINTS
p["_ws_url"] = ws_map.get(p.get("name")) or ws_map.get(p.get("base_url"))
# Mark these as high priority
p["_priority"] = "high"
providers.append(p)
# deduplicate by base_url or name
seen = set()
unique = []
for p in providers:
key = (p.get("base_url") or p.get("name") or p.get("id") or "")[:200]
if key and key not in seen:
seen.add(key)
unique.append(p)
logger.info(f"Loaded {len(unique)} unique providers")
return unique
# Discover candidate REST endpoints for OHLC from providers
def discover_ohlc_providers(providers: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
out = []
for p in providers:
eps = extract_ohlc_endpoints(p)
if eps:
out.append({"provider": p, "endpoints": eps})
return out
# Fetch and normalize various provider response shapes to standardized OHLC rows
def normalize_response_to_candles(provider_name: str, url: str, resp_json: Any, symbol: str, timeframe: str) -> List[Dict[str, Any]]:
"""
try common response shapes:
- list of lists: [[ts, open, high, low, close, vol], ...]
- dict with 'data' key which is list of lists or dicts
- list of dicts: [{"time":..., "open":..., "high":..., "low":..., "close":..., "volume":...}, ...]
Return list of rows: {ts, open, high, low, close, volume}
"""
rows = []
try:
j = resp_json
# common: list-of-lists (binance-like)
if isinstance(j, list) and len(j) > 0 and isinstance(j[0], list):
for item in j:
if not item or len(item) < 6:
continue
# some APIs: [openTime, open, high, low, close, volume, ...]
ts = int(item[0]) if item and item[0] else None
o = float(item[1])
h = float(item[2])
l = float(item[3])
c = float(item[4])
v = float(item[5])
# Convert timestamp (handle both milliseconds and seconds)
if ts:
ts_dt = datetime.utcfromtimestamp(ts / 1000) if ts > 1e10 else datetime.utcfromtimestamp(ts)
else:
ts_dt = None
rows.append({"ts": ts_dt, "open": o, "high": h, "low": l, "close": c, "volume": v})
return rows
# dict with data key
if isinstance(j, dict):
# check variants
if "data" in j and isinstance(j["data"], list):
jlist = j["data"]
# check if list-of-lists or list-of-dicts
if len(jlist) > 0 and isinstance(jlist[0], list):
# same as above
for item in jlist:
if not item or len(item) < 6:
continue
ts = int(item[0]) if item and item[0] else None
o = float(item[1])
h = float(item[2])
l = float(item[3])
c = float(item[4])
v = float(item[5])
if ts:
ts_dt = datetime.utcfromtimestamp(ts / 1000) if ts > 1e10 else datetime.utcfromtimestamp(ts)
else:
ts_dt = None
rows.append({"ts": ts_dt, "open": o, "high": h, "low": l, "close": c, "volume": v})
return rows
elif len(jlist) > 0 and isinstance(jlist[0], dict):
for item in jlist:
ts = item.get("time") or item.get("ts") or item.get("timestamp") or item.get("date")
# try to parse ts
if isinstance(ts, (int, float)):
tval = datetime.utcfromtimestamp(int(ts) / 1000) if int(ts) > 1e10 else datetime.utcfromtimestamp(int(ts))
else:
try:
tval = datetime.fromisoformat(str(ts).replace("Z", "+00:00"))
except Exception:
tval = None
o = float(item.get("open") or item.get("o") or 0)
h = float(item.get("high") or item.get("h") or 0)
l = float(item.get("low") or item.get("l") or 0)
c = float(item.get("close") or item.get("c") or 0)
v = float(item.get("volume") or item.get("v") or 0)
rows.append({"ts": tval, "open": o, "high": h, "low": l, "close": c, "volume": v})
return rows
# list of dicts
if isinstance(j, list) and len(j) > 0 and isinstance(j[0], dict):
for item in j:
ts = item.get("time") or item.get("ts") or item.get("timestamp") or item.get("date")
if isinstance(ts, (int, float)):
tval = datetime.utcfromtimestamp(int(ts) / 1000) if int(ts) > 1e10 else datetime.utcfromtimestamp(int(ts))
else:
try:
tval = datetime.fromisoformat(str(ts).replace("Z", "+00:00"))
except Exception:
tval = None
o = float(item.get("open") or item.get("o") or 0)
h = float(item.get("high") or item.get("h") or 0)
l = float(item.get("low") or item.get("l") or 0)
c = float(item.get("close") or item.get("c") or 0)
v = float(item.get("volume") or item.get("v") or 0)
rows.append({"ts": tval, "open": o, "high": h, "low": l, "close": c, "volume": v})
return rows
except Exception as e:
logger.debug(f"normalize_response_to_candles error for {provider_name} {url}: {e}")
return rows
# Save rows into DB
def save_candles(engine, ohlc_table, provider_name: str, symbol: str, timeframe: str, rows: List[Dict[str, Any]]):
if not rows:
return 0
conn = engine.connect()
trans = conn.begin() # Start transaction
ins = ohlc_table.insert()
count = 0
try:
for r in rows:
try:
# TS may be None; skip
if not r.get("ts"):
continue
rec = {
"provider": provider_name,
"symbol": symbol,
"timeframe": timeframe,
"ts": r["ts"],
"open": r["open"],
"high": r["high"],
"low": r["low"],
"close": r["close"],
"volume": r["volume"],
"raw": json.dumps(r, default=str)
}
conn.execute(ins.values(**rec))
count += 1
except Exception as e:
logger.debug(f"db insert error: {e}")
trans.commit() # Commit transaction
except Exception as e:
trans.rollback() # Rollback on error
logger.error(f"Failed to save candles: {e}")
finally:
conn.close()
return count
# Attempt REST fetch against an endpoint; returns parsed JSON or None
def try_fetch_http(client: httpx.Client, url: str, headers: Dict[str, str], params: Dict[str, Any] = None, timeout=20):
try:
resp = client.get(url, headers=headers, params=params, timeout=timeout)
resp.raise_for_status()
ct = resp.headers.get("content-type", "")
if "application/json" in ct or resp.text.strip().startswith(("{", "[")):
return resp.json()
# maybe CSV? ignore for now
return None
except httpx.HTTPStatusError as e:
logger.debug(f"HTTP {e.response.status_code} for {url}")
return None
except Exception as e:
logger.debug(f"HTTP fetch failed for {url}: {e}")
return None
# Main worker: fetch candles for provided symbols and timeframe
def run_worker(symbols: List[str], timeframe: str, limit: int = DEFAULT_LIMIT, once: bool = True, interval: int = 300, max_providers: Optional[int] = None):
providers = build_providers_list()
logger.info(f"Providers loaded: {len(providers)}")
ohlc_providers = discover_ohlc_providers(providers)
logger.info(f"Providers with candidate OHLC endpoints: {len(ohlc_providers)}")
# DB setup
engine = create_engine(f"sqlite:///{DB_PATH}", connect_args={"check_same_thread": False})
ohlc_table = ensure_db_and_table(engine)
logger.info(f"Database initialized: {DB_PATH}")
client = httpx.Client(headers={"User-Agent": USER_AGENT}, timeout=30)
summary = {
"run_started": datetime.utcnow().isoformat(),
"providers_tested": 0,
"candles_saved": 0,
"errors": [],
"successful_providers": []
}
try:
while True:
for sym in symbols:
tested = 0
saved_for_symbol = False
# try each provider until we succeed
for item in ohlc_providers:
if max_providers and tested >= max_providers:
break
p = item["provider"]
pname = p.get("name") or p.get("base_url") or "unknown_provider"
endpoints = item.get("endpoints", [])
for ep in endpoints:
url_template = ep.get("url")
if not url_template:
continue
url = render_template(url_template, sym, normalize_timeframe(timeframe), limit)
# If url seems not fully qualified, try to build with base_url
if url.startswith("{") or not url.startswith("http"):
base = p.get("base_url") or p.get("url") or p.get("api")
if base:
url = (base.rstrip("/") + "/" + url.lstrip("/"))
logger.info(f"Trying provider {pname} -> {url}")
summary["providers_tested"] += 1
headers = {}
# include token if provider has key in record (non-sensitive)
if p.get("api_key"):
headers["Authorization"] = f"Bearer {p.get('api_key')}"
# try HTTP REST
j = try_fetch_http(client, url, headers)
if j:
rows = normalize_response_to_candles(pname, url, j, sym, timeframe)
if rows:
saved = save_candles(engine, ohlc_table, pname, sym, timeframe, rows)
summary["candles_saved"] += saved
logger.info(f"✓ Saved {saved} candles from {pname} for {sym}")
# Track successful provider
if pname not in summary["successful_providers"]:
summary["successful_providers"].append(pname)
saved_for_symbol = True
tested += 1
break # Found working endpoint for this provider
if saved_for_symbol:
break # Move to next symbol
if not saved_for_symbol:
logger.warning(f"Could not fetch OHLC data for {sym} from any provider")
# write summary to filesystem each iteration
summary["last_run"] = datetime.utcnow().isoformat()
with open(SUMMARY_PATH, "w", encoding="utf-8") as sf:
json.dump(summary, sf, indent=2, ensure_ascii=False)
logger.info(f"Summary: Tested {summary['providers_tested']} endpoints, saved {summary['candles_saved']} candles")
# exit if once
if once:
break
logger.info(f"Worker sleeping for {interval}s ...")
time.sleep(interval)
except KeyboardInterrupt:
logger.info("Interrupted by user")
except Exception as e:
logger.exception(f"Worker fatal error: {e}")
summary["errors"].append(str(e))
with open(SUMMARY_PATH, "w", encoding="utf-8") as sf:
json.dump(summary, sf, indent=2, ensure_ascii=False)
finally:
client.close()
logger.info(f"Worker finished. Summary written to {SUMMARY_PATH}")
# CLI
def parse_args():
ap = argparse.ArgumentParser(description="Enhanced OHLC worker that uses provider registry files")
ap.add_argument("--symbols", type=str, help="Comma-separated list of symbols (e.g. BTCUSDT,ETHUSDT). If omitted, will try to read from crypto resources.", default=None)
ap.add_argument("--timeframe", type=str, default=DEFAULT_TIMEFRAME)
ap.add_argument("--limit", type=int, default=DEFAULT_LIMIT)
ap.add_argument("--once", action="store_true", help="Run once and exit (default True)")
ap.add_argument("--loop", action="store_true", help="Run continuously (overrides --once)")
ap.add_argument("--interval", type=int, default=300, help="Sleep seconds between runs when using --loop")
ap.add_argument("--max-providers", type=int, default=None, help="Max providers to try per symbol")
return ap.parse_args()
def symbols_from_resources() -> List[str]:
res = read_json(CRYPTO_RESOURCES) or {}
symbols = []
# try to find common fields: trading_pairs, markets, pairs
if isinstance(res, dict):
for key in ("trading_pairs", "pairs", "markets", "symbols", "items"):
arr = res.get(key)
if isinstance(arr, list) and len(arr) > 0:
# some entries may be objects with symbol/key
for item in arr[:20]: # Limit to first 20 to avoid overwhelming
if isinstance(item, str):
symbols.append(item)
elif isinstance(item, dict):
s = item.get("symbol") or item.get("pair") or item.get("id") or item.get("name")
if s:
symbols.append(s)
break # Use first matching key only
# dedupe
return list(dict.fromkeys(symbols))
if __name__ == "__main__":
args = parse_args()
syms = []
if args.symbols:
syms = [s.strip() for s in args.symbols.split(",") if s.strip()]
else:
syms = symbols_from_resources()
if not syms:
logger.warning(f"No symbols found in {CRYPTO_RESOURCES}, defaulting to ['BTCUSDT', 'ETHUSDT']")
syms = ["BTCUSDT", "ETHUSDT"]
logger.info(f"Starting OHLC worker for symbols: {', '.join(syms)}")
once_flag = not args.loop
run_worker(symbols=syms, timeframe=args.timeframe, limit=args.limit, once=once_flag, interval=args.interval, max_providers=args.max_providers)