#!/usr/bin/env python3 """ ohlc_worker_enhanced.py Enhanced OHLC worker that uses provider registry files to discover and fetch from multiple providers. - Loads provider registries: providers_registered.json providers_config_extended.json crypto_resources_unified_2025-11-11.json WEBSOCKET_URL_FIX.json - Discovers providers that expose OHLC/candles/klines endpoints - Fetches OHLC data via REST (prefers REST, minimal WebSocket usage) - Stores normalized rows into SQLite DB: data/crypto_monitor.db - Produces summary log and JSON output Usage: python workers/ohlc_worker_enhanced.py --once --symbols BTC-USDT,ETH-USDT --timeframe 1h or run in loop: python workers/ohlc_worker_enhanced.py --loop --interval 300 """ import os import json import time import argparse import logging from typing import List, Dict, Any, Optional from datetime import datetime, timedelta from pathlib import Path import sys # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) # httpx for HTTP requests try: import httpx except ImportError: print("ERROR: httpx not installed. Run: pip install httpx") sys.exit(1) # sqlite via sqlalchemy for convenience try: from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, DateTime, text from sqlalchemy.exc import OperationalError except ImportError: print("ERROR: sqlalchemy not installed. Run: pip install sqlalchemy") sys.exit(1) # Determine base directory (repo root) BASE_DIR = Path(__file__).parent.parent.resolve() # Config / paths (override via env, default to local structure) PROVIDERS_REGISTERED = os.environ.get("OHLC_PROVIDERS_REGISTERED", str(BASE_DIR / "data" / "providers_registered.json")) PROVIDERS_CONFIG = os.environ.get("OHLC_PROVIDERS_CONFIG", str(BASE_DIR / "app" / "providers_config_extended.json")) CRYPTO_RESOURCES = os.environ.get("OHLC_CRYPTO_RESOURCES", str(BASE_DIR / "api-resources" / "crypto_resources_unified_2025-11-11.json")) WEBSOCKET_FIX = os.environ.get("OHLC_WEBSOCKET_FIX", str(BASE_DIR / "WEBSOCKET_URL_FIX.json")) EXCHANGE_ENDPOINTS = os.environ.get("OHLC_EXCHANGE_ENDPOINTS", str(BASE_DIR / "data" / "exchange_ohlc_endpoints.json")) DB_PATH = os.environ.get("OHLC_DB", str(BASE_DIR / "data" / "crypto_monitor.db")) LOG_PATH = os.environ.get("OHLC_LOG", str(BASE_DIR / "tmp" / "ohlc_worker_enhanced.log")) SUMMARY_PATH = os.environ.get("OHLC_SUMMARY", str(BASE_DIR / "tmp" / "ohlc_worker_enhanced_summary.json")) # Ensure tmp directory exists Path(LOG_PATH).parent.mkdir(parents=True, exist_ok=True) # Defaults DEFAULT_TIMEFRAME = "1h" DEFAULT_LIMIT = 200 # number of candles to request per fetch when supported USER_AGENT = "ohlc_worker_enhanced/1.0" # Logging logging.basicConfig( filename=LOG_PATH, filemode="a", level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger("ohlc_worker_enhanced") console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) logger.addHandler(console_handler) # Utility: read json safely def read_json(path: str) -> Any: try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except FileNotFoundError: logger.warning(f"File not found: {path}") return None except json.JSONDecodeError as e: logger.warning(f"JSON decode error for {path}: {e}") return None except Exception as e: logger.warning(f"read_json failed for {path}: {e}") return None # Normalize provider records: try to extract REST OHLC endpoints def extract_ohlc_endpoints(provider_record: Dict[str, Any]) -> List[Dict[str, Any]]: """ provider_record: dict that may contain keys: - base_url, endpoints, api_endpoints, ohlc, candles, kline Return list of endpoint dicts: {url_template, method, params_spec} url_template may include placeholders {symbol} {timeframe} {limit} etc. """ endpoints = [] # common places candidates = [] if isinstance(provider_record, dict): for key in ("endpoints", "api_endpoints", "endpoints_list"): if key in provider_record and isinstance(provider_record[key], list): candidates.extend(provider_record[key]) elif key in provider_record and isinstance(provider_record[key], dict): # Sometimes endpoints is a dict with keys for different endpoint types for ep_key, ep_val in provider_record[key].items(): if isinstance(ep_val, (str, dict)): candidates.append(ep_val) # sometimes endpoints are top-level keys for pkey in ("ohlc", "candles", "kline", "klines", "ticker"): if pkey in provider_record: candidates.append(provider_record[pkey]) # fallback: base_url + known path templates base = provider_record.get("base_url") or provider_record.get("url") or provider_record.get("api") if base: # Check if base URL suggests it's a known exchange base_lower = base.lower() if "binance" in base_lower: # Binance-style endpoints common_templates = [ {"url": base.rstrip("/") + "/api/v3/klines?symbol={symbol}&interval={timeframe}&limit={limit}", "method": "GET"}, {"url": base.rstrip("/") + "/api/v1/klines?symbol={symbol}&interval={timeframe}&limit={limit}", "method": "GET"}, ] candidates.extend(common_templates) elif any(x in base_lower for x in ["kraken", "coinbase", "okx", "huobi", "bybit"]): # Generic exchange templates - these are just candidates, actual usage will depend on testing common_templates = [ {"url": base.rstrip("/") + "/api/v1/klines?symbol={symbol}&interval={timeframe}&limit={limit}", "method": "GET"}, {"url": base.rstrip("/") + "/v1/market/candles?market={symbol}&period={timeframe}&limit={limit}", "method": "GET"}, ] candidates.extend(common_templates) # normalize candidate entries for c in candidates: if isinstance(c, str): # Check if string looks like it contains OHLC-related keywords if any(kw in c.lower() for kw in ["kline", "candle", "ohlc", "chart", "history"]): endpoints.append({"url": c, "method": "GET"}) elif isinstance(c, dict): # prefer url, template, path url = c.get("url") or c.get("template") or c.get("path") or c.get("endpoint") if url and isinstance(url, str): # Check if URL looks OHLC-related if any(kw in url.lower() for kw in ["kline", "candle", "ohlc", "chart", "history"]): endpoints.append({"url": url, "method": c.get("method", "GET"), "meta": c.get("meta")}) else: # sometimes entries are nested - store the whole dict as meta endpoints.append({"url": json.dumps(c), "method": "GET", "meta": c}) return endpoints # Guess if an endpoint template can produce REST OHLC for given symbol/timeframe def render_template(url_template: str, symbol: str, timeframe: str, limit: int) -> str: try: return url_template.format(symbol=symbol, timeframe=timeframe, limit=limit) except KeyError: # try simple replacements for common placeholders u = url_template.replace("{symbol}", symbol) u = u.replace("{timeframe}", timeframe).replace("{interval}", timeframe) u = u.replace("{limit}", str(limit)) # Also try {market} as alias for symbol u = u.replace("{market}", symbol).replace("{pair}", symbol) return u except Exception: return url_template # Minimal parser for common exchange interval names def normalize_timeframe(tf: str) -> str: # map friendly timeframes to common exchange-style intervals mapping = { "1m": "1m", "5m": "5m", "15m": "15m", "30m": "30m", "1h": "1h", "4h": "4h", "1d": "1d", "1w": "1w" } return mapping.get(tf, tf) # DB: ensures table exists and inserts normalized candle rows def ensure_db_and_table(engine): meta = MetaData() ohlc = Table( "ohlc_data", meta, Column("id", Integer, primary_key=True, autoincrement=True), Column("provider", String(128), index=True), Column("symbol", String(64), index=True), Column("timeframe", String(16), index=True), Column("ts", DateTime, index=True), # candle start time Column("open", Float), Column("high", Float), Column("low", Float), Column("close", Float), Column("volume", Float), Column("raw", String), # raw JSON as string if needed extend_existing=True ) meta.create_all(engine) return ohlc # Normalize provider list from the three JSON files def build_providers_list() -> List[Dict[str, Any]]: # Priority: providers_config_extended.json -> providers_registered.json -> providers from crypto_resources providers = [] # Read all config files cfg = read_json(PROVIDERS_CONFIG) or {} reg = read_json(PROVIDERS_REGISTERED) or {} resources = read_json(CRYPTO_RESOURCES) or {} ws_map = read_json(WEBSOCKET_FIX) or {} logger.info(f"Loading providers from config files:") logger.info(f" - {PROVIDERS_CONFIG}: {'OK' if cfg else 'SKIP'}") logger.info(f" - {PROVIDERS_REGISTERED}: {'OK' if reg else 'SKIP'}") logger.info(f" - {CRYPTO_RESOURCES}: {'OK' if resources else 'SKIP'}") logger.info(f" - {WEBSOCKET_FIX}: {'OK' if ws_map else 'SKIP'}") # Also load exchange endpoints config exchange_cfg = read_json(EXCHANGE_ENDPOINTS) or {} logger.info(f" - {EXCHANGE_ENDPOINTS}: {'OK' if exchange_cfg else 'SKIP'}") # providers from config for p in (cfg.get("providers") if isinstance(cfg, dict) else cfg if isinstance(cfg, list) else []) or []: if isinstance(p, dict): p["_source_file"] = PROVIDERS_CONFIG p["_ws_url"] = ws_map.get(p.get("name")) or ws_map.get(p.get("base_url")) providers.append(p) # registered providers if isinstance(reg, list): for p in reg: if isinstance(p, dict): p["_source_file"] = PROVIDERS_REGISTERED p["_ws_url"] = ws_map.get(p.get("name")) or ws_map.get(p.get("base_url")) providers.append(p) elif isinstance(reg, dict): for p in (reg.get("providers") or []): if isinstance(p, dict): p["_source_file"] = PROVIDERS_REGISTERED p["_ws_url"] = ws_map.get(p.get("name")) or ws_map.get(p.get("base_url")) providers.append(p) # also scan resources (some resources include provider endpoints) if isinstance(resources, dict): # resources may contain provider entries or endpoints entries = resources.get("providers") or resources.get("exchanges") or [] for p in entries: if isinstance(p, dict): p["_source_file"] = CRYPTO_RESOURCES p["_ws_url"] = ws_map.get(p.get("name")) or ws_map.get(p.get("base_url")) providers.append(p) # Load exchange endpoints (highest priority - these are known working endpoints) if isinstance(exchange_cfg, dict): for p in (exchange_cfg.get("providers") or []): if isinstance(p, dict): p["_source_file"] = EXCHANGE_ENDPOINTS p["_ws_url"] = ws_map.get(p.get("name")) or ws_map.get(p.get("base_url")) # Mark these as high priority p["_priority"] = "high" providers.append(p) # deduplicate by base_url or name seen = set() unique = [] for p in providers: key = (p.get("base_url") or p.get("name") or p.get("id") or "")[:200] if key and key not in seen: seen.add(key) unique.append(p) logger.info(f"Loaded {len(unique)} unique providers") return unique # Discover candidate REST endpoints for OHLC from providers def discover_ohlc_providers(providers: List[Dict[str, Any]]) -> List[Dict[str, Any]]: out = [] for p in providers: eps = extract_ohlc_endpoints(p) if eps: out.append({"provider": p, "endpoints": eps}) return out # Fetch and normalize various provider response shapes to standardized OHLC rows def normalize_response_to_candles(provider_name: str, url: str, resp_json: Any, symbol: str, timeframe: str) -> List[Dict[str, Any]]: """ try common response shapes: - list of lists: [[ts, open, high, low, close, vol], ...] - dict with 'data' key which is list of lists or dicts - list of dicts: [{"time":..., "open":..., "high":..., "low":..., "close":..., "volume":...}, ...] Return list of rows: {ts, open, high, low, close, volume} """ rows = [] try: j = resp_json # common: list-of-lists (binance-like) if isinstance(j, list) and len(j) > 0 and isinstance(j[0], list): for item in j: if not item or len(item) < 6: continue # some APIs: [openTime, open, high, low, close, volume, ...] ts = int(item[0]) if item and item[0] else None o = float(item[1]) h = float(item[2]) l = float(item[3]) c = float(item[4]) v = float(item[5]) # Convert timestamp (handle both milliseconds and seconds) if ts: ts_dt = datetime.utcfromtimestamp(ts / 1000) if ts > 1e10 else datetime.utcfromtimestamp(ts) else: ts_dt = None rows.append({"ts": ts_dt, "open": o, "high": h, "low": l, "close": c, "volume": v}) return rows # dict with data key if isinstance(j, dict): # check variants if "data" in j and isinstance(j["data"], list): jlist = j["data"] # check if list-of-lists or list-of-dicts if len(jlist) > 0 and isinstance(jlist[0], list): # same as above for item in jlist: if not item or len(item) < 6: continue ts = int(item[0]) if item and item[0] else None o = float(item[1]) h = float(item[2]) l = float(item[3]) c = float(item[4]) v = float(item[5]) if ts: ts_dt = datetime.utcfromtimestamp(ts / 1000) if ts > 1e10 else datetime.utcfromtimestamp(ts) else: ts_dt = None rows.append({"ts": ts_dt, "open": o, "high": h, "low": l, "close": c, "volume": v}) return rows elif len(jlist) > 0 and isinstance(jlist[0], dict): for item in jlist: ts = item.get("time") or item.get("ts") or item.get("timestamp") or item.get("date") # try to parse ts if isinstance(ts, (int, float)): tval = datetime.utcfromtimestamp(int(ts) / 1000) if int(ts) > 1e10 else datetime.utcfromtimestamp(int(ts)) else: try: tval = datetime.fromisoformat(str(ts).replace("Z", "+00:00")) except Exception: tval = None o = float(item.get("open") or item.get("o") or 0) h = float(item.get("high") or item.get("h") or 0) l = float(item.get("low") or item.get("l") or 0) c = float(item.get("close") or item.get("c") or 0) v = float(item.get("volume") or item.get("v") or 0) rows.append({"ts": tval, "open": o, "high": h, "low": l, "close": c, "volume": v}) return rows # list of dicts if isinstance(j, list) and len(j) > 0 and isinstance(j[0], dict): for item in j: ts = item.get("time") or item.get("ts") or item.get("timestamp") or item.get("date") if isinstance(ts, (int, float)): tval = datetime.utcfromtimestamp(int(ts) / 1000) if int(ts) > 1e10 else datetime.utcfromtimestamp(int(ts)) else: try: tval = datetime.fromisoformat(str(ts).replace("Z", "+00:00")) except Exception: tval = None o = float(item.get("open") or item.get("o") or 0) h = float(item.get("high") or item.get("h") or 0) l = float(item.get("low") or item.get("l") or 0) c = float(item.get("close") or item.get("c") or 0) v = float(item.get("volume") or item.get("v") or 0) rows.append({"ts": tval, "open": o, "high": h, "low": l, "close": c, "volume": v}) return rows except Exception as e: logger.debug(f"normalize_response_to_candles error for {provider_name} {url}: {e}") return rows # Save rows into DB def save_candles(engine, ohlc_table, provider_name: str, symbol: str, timeframe: str, rows: List[Dict[str, Any]]): if not rows: return 0 conn = engine.connect() trans = conn.begin() # Start transaction ins = ohlc_table.insert() count = 0 try: for r in rows: try: # TS may be None; skip if not r.get("ts"): continue rec = { "provider": provider_name, "symbol": symbol, "timeframe": timeframe, "ts": r["ts"], "open": r["open"], "high": r["high"], "low": r["low"], "close": r["close"], "volume": r["volume"], "raw": json.dumps(r, default=str) } conn.execute(ins.values(**rec)) count += 1 except Exception as e: logger.debug(f"db insert error: {e}") trans.commit() # Commit transaction except Exception as e: trans.rollback() # Rollback on error logger.error(f"Failed to save candles: {e}") finally: conn.close() return count # Attempt REST fetch against an endpoint; returns parsed JSON or None def try_fetch_http(client: httpx.Client, url: str, headers: Dict[str, str], params: Dict[str, Any] = None, timeout=20): try: resp = client.get(url, headers=headers, params=params, timeout=timeout) resp.raise_for_status() ct = resp.headers.get("content-type", "") if "application/json" in ct or resp.text.strip().startswith(("{", "[")): return resp.json() # maybe CSV? ignore for now return None except httpx.HTTPStatusError as e: logger.debug(f"HTTP {e.response.status_code} for {url}") return None except Exception as e: logger.debug(f"HTTP fetch failed for {url}: {e}") return None # Main worker: fetch candles for provided symbols and timeframe def run_worker(symbols: List[str], timeframe: str, limit: int = DEFAULT_LIMIT, once: bool = True, interval: int = 300, max_providers: Optional[int] = None): providers = build_providers_list() logger.info(f"Providers loaded: {len(providers)}") ohlc_providers = discover_ohlc_providers(providers) logger.info(f"Providers with candidate OHLC endpoints: {len(ohlc_providers)}") # DB setup engine = create_engine(f"sqlite:///{DB_PATH}", connect_args={"check_same_thread": False}) ohlc_table = ensure_db_and_table(engine) logger.info(f"Database initialized: {DB_PATH}") client = httpx.Client(headers={"User-Agent": USER_AGENT}, timeout=30) summary = { "run_started": datetime.utcnow().isoformat(), "providers_tested": 0, "candles_saved": 0, "errors": [], "successful_providers": [] } try: while True: for sym in symbols: tested = 0 saved_for_symbol = False # try each provider until we succeed for item in ohlc_providers: if max_providers and tested >= max_providers: break p = item["provider"] pname = p.get("name") or p.get("base_url") or "unknown_provider" endpoints = item.get("endpoints", []) for ep in endpoints: url_template = ep.get("url") if not url_template: continue url = render_template(url_template, sym, normalize_timeframe(timeframe), limit) # If url seems not fully qualified, try to build with base_url if url.startswith("{") or not url.startswith("http"): base = p.get("base_url") or p.get("url") or p.get("api") if base: url = (base.rstrip("/") + "/" + url.lstrip("/")) logger.info(f"Trying provider {pname} -> {url}") summary["providers_tested"] += 1 headers = {} # include token if provider has key in record (non-sensitive) if p.get("api_key"): headers["Authorization"] = f"Bearer {p.get('api_key')}" # try HTTP REST j = try_fetch_http(client, url, headers) if j: rows = normalize_response_to_candles(pname, url, j, sym, timeframe) if rows: saved = save_candles(engine, ohlc_table, pname, sym, timeframe, rows) summary["candles_saved"] += saved logger.info(f"✓ Saved {saved} candles from {pname} for {sym}") # Track successful provider if pname not in summary["successful_providers"]: summary["successful_providers"].append(pname) saved_for_symbol = True tested += 1 break # Found working endpoint for this provider if saved_for_symbol: break # Move to next symbol if not saved_for_symbol: logger.warning(f"Could not fetch OHLC data for {sym} from any provider") # write summary to filesystem each iteration summary["last_run"] = datetime.utcnow().isoformat() with open(SUMMARY_PATH, "w", encoding="utf-8") as sf: json.dump(summary, sf, indent=2, ensure_ascii=False) logger.info(f"Summary: Tested {summary['providers_tested']} endpoints, saved {summary['candles_saved']} candles") # exit if once if once: break logger.info(f"Worker sleeping for {interval}s ...") time.sleep(interval) except KeyboardInterrupt: logger.info("Interrupted by user") except Exception as e: logger.exception(f"Worker fatal error: {e}") summary["errors"].append(str(e)) with open(SUMMARY_PATH, "w", encoding="utf-8") as sf: json.dump(summary, sf, indent=2, ensure_ascii=False) finally: client.close() logger.info(f"Worker finished. Summary written to {SUMMARY_PATH}") # CLI def parse_args(): ap = argparse.ArgumentParser(description="Enhanced OHLC worker that uses provider registry files") ap.add_argument("--symbols", type=str, help="Comma-separated list of symbols (e.g. BTCUSDT,ETHUSDT). If omitted, will try to read from crypto resources.", default=None) ap.add_argument("--timeframe", type=str, default=DEFAULT_TIMEFRAME) ap.add_argument("--limit", type=int, default=DEFAULT_LIMIT) ap.add_argument("--once", action="store_true", help="Run once and exit (default True)") ap.add_argument("--loop", action="store_true", help="Run continuously (overrides --once)") ap.add_argument("--interval", type=int, default=300, help="Sleep seconds between runs when using --loop") ap.add_argument("--max-providers", type=int, default=None, help="Max providers to try per symbol") return ap.parse_args() def symbols_from_resources() -> List[str]: res = read_json(CRYPTO_RESOURCES) or {} symbols = [] # try to find common fields: trading_pairs, markets, pairs if isinstance(res, dict): for key in ("trading_pairs", "pairs", "markets", "symbols", "items"): arr = res.get(key) if isinstance(arr, list) and len(arr) > 0: # some entries may be objects with symbol/key for item in arr[:20]: # Limit to first 20 to avoid overwhelming if isinstance(item, str): symbols.append(item) elif isinstance(item, dict): s = item.get("symbol") or item.get("pair") or item.get("id") or item.get("name") if s: symbols.append(s) break # Use first matching key only # dedupe return list(dict.fromkeys(symbols)) if __name__ == "__main__": args = parse_args() syms = [] if args.symbols: syms = [s.strip() for s in args.symbols.split(",") if s.strip()] else: syms = symbols_from_resources() if not syms: logger.warning(f"No symbols found in {CRYPTO_RESOURCES}, defaulting to ['BTCUSDT', 'ETHUSDT']") syms = ["BTCUSDT", "ETHUSDT"] logger.info(f"Starting OHLC worker for symbols: {', '.join(syms)}") once_flag = not args.loop run_worker(symbols=syms, timeframe=args.timeframe, limit=args.limit, once=once_flag, interval=args.interval, max_providers=args.max_providers)