Datasourceforcryptocurrency-2 / workers /simple_market_collector.py
Really-amin's picture
Upload 856 files
d115c85 verified
"""
Simple Market Data Collector - Direct Implementation
Fetches data from free APIs and stores in database
No complex dependencies - just works!
"""
import asyncio
import sys
from datetime import datetime, timezone
from pathlib import Path
import httpx
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from database.db_manager import DatabaseManager
from database.models import MarketPrice, OHLC, SentimentMetric
from utils.logger import setup_logger
logger = setup_logger("simple_market_collector")
class SimpleMarketCollector:
"""
Simple, direct collector for market data from free APIs
"""
def __init__(self, db_path: str = "data/api_monitor.db"):
"""Initialize collector"""
self.db_manager = DatabaseManager(db_path=db_path)
self.db_manager.init_database()
self.running = False
# Free APIs (no authentication required)
self.coingecko_url = "https://api.coingecko.com/api/v3/simple/price"
self.binance_url = "https://api.binance.com/api/v3/ticker/24hr"
self.binance_klines_url = "https://api.binance.com/api/v3/klines"
self.fear_greed_url = "https://api.alternative.me/fng/"
logger.info("βœ… Simple Market Collector initialized")
async def fetch_coingecko_prices(self) -> dict:
"""
Fetch prices from CoinGecko (FREE - no key needed)
Returns:
Dictionary with price data or empty dict on error
"""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
params = {
"ids": "bitcoin,ethereum,binancecoin,tron,solana",
"vs_currencies": "usd",
"include_market_cap": "true",
"include_24hr_vol": "true",
"include_24hr_change": "true"
}
response = await client.get(self.coingecko_url, params=params)
response.raise_for_status()
data = response.json()
logger.info(f"βœ… CoinGecko: Retrieved {len(data)} coins")
return data
except Exception as e:
logger.error(f"❌ CoinGecko fetch failed: {str(e)}")
return {}
async def fetch_binance_prices(self) -> list:
"""
Fetch prices from Binance (FREE - no key needed)
Returns:
List of price data or empty list on error
"""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(self.binance_url)
response.raise_for_status()
data = response.json()
# Filter for major USDT pairs
major_pairs = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT"]
filtered = [item for item in data if item["symbol"] in major_pairs]
logger.info(f"βœ… Binance: Retrieved {len(filtered)} pairs")
return filtered
except Exception as e:
logger.error(f"❌ Binance fetch failed: {str(e)}")
return []
async def fetch_binance_ohlc(self, symbol: str = "BTCUSDT", interval: str = "1h", limit: int = 100) -> list:
"""
Fetch OHLC candlestick data from Binance (FREE - no key needed)
Args:
symbol: Trading pair (e.g., "BTCUSDT")
interval: Timeframe (1m, 5m, 15m, 1h, 4h, 1d)
limit: Number of candles (max 1000)
Returns:
List of OHLC data or empty list on error
"""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
params = {
"symbol": symbol,
"interval": interval,
"limit": limit
}
response = await client.get(self.binance_klines_url, params=params)
response.raise_for_status()
data = response.json()
logger.info(f"βœ… Binance OHLC: Retrieved {len(data)} candles for {symbol} ({interval})")
return data
except Exception as e:
logger.error(f"❌ Binance OHLC fetch failed: {str(e)}")
return []
async def fetch_fear_greed_index(self) -> dict:
"""
Fetch Fear & Greed Index from Alternative.me (FREE - no key needed)
Returns:
Dictionary with fear & greed data or empty dict on error
"""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(self.fear_greed_url)
response.raise_for_status()
data = response.json()
logger.info(f"βœ… Fear & Greed: Retrieved index")
return data
except Exception as e:
logger.error(f"❌ Fear & Greed fetch failed: {str(e)}")
return {}
def store_coingecko_data(self, data: dict) -> int:
"""
Store CoinGecko data in database
Args:
data: CoinGecko API response data
Returns:
Number of records stored
"""
if not data:
return 0
# Map CoinGecko IDs to symbols
symbol_map = {
"bitcoin": "BTC",
"ethereum": "ETH",
"binancecoin": "BNB",
"tron": "TRX",
"solana": "SOL"
}
stored_count = 0
try:
with self.db_manager.get_session() as session:
for coin_id, coin_data in data.items():
if not isinstance(coin_data, dict):
continue
symbol = symbol_map.get(coin_id, coin_id.upper())
# Create MarketPrice record
price_record = MarketPrice(
symbol=symbol,
price_usd=coin_data.get("usd", 0),
market_cap=coin_data.get("usd_market_cap"),
volume_24h=coin_data.get("usd_24h_vol"),
price_change_24h=coin_data.get("usd_24h_change"),
timestamp=datetime.now(timezone.utc),
source="CoinGecko"
)
session.add(price_record)
stored_count += 1
session.commit()
logger.info(f"πŸ’Ύ Stored {stored_count} CoinGecko records")
return stored_count
except Exception as e:
logger.error(f"❌ Error storing CoinGecko data: {str(e)}", exc_info=True)
return 0
def store_binance_data(self, data: list) -> int:
"""
Store Binance data in database
Args:
data: Binance API response data
Returns:
Number of records stored
"""
if not data:
return 0
stored_count = 0
try:
with self.db_manager.get_session() as session:
for item in data:
# Extract symbol (remove USDT suffix)
symbol = item["symbol"].replace("USDT", "")
# Create MarketPrice record
price_record = MarketPrice(
symbol=symbol,
price_usd=float(item.get("lastPrice", 0)),
market_cap=None, # Binance doesn't provide this
volume_24h=float(item.get("quoteVolume", 0)),
price_change_24h=float(item.get("priceChangePercent", 0)),
timestamp=datetime.now(timezone.utc),
source="Binance"
)
session.add(price_record)
stored_count += 1
session.commit()
logger.info(f"πŸ’Ύ Stored {stored_count} Binance records")
return stored_count
except Exception as e:
logger.error(f"❌ Error storing Binance data: {str(e)}", exc_info=True)
return 0
def store_ohlc_data(self, data: list, symbol: str, interval: str) -> int:
"""
Store OHLC candlestick data in database
Args:
data: Binance klines API response
symbol: Trading pair symbol
interval: Timeframe interval
Returns:
Number of candles stored
"""
if not data:
return 0
stored_count = 0
try:
with self.db_manager.get_session() as session:
for candle in data:
# Binance kline format: [timestamp, open, high, low, close, volume, ...]
ohlc_record = OHLC(
symbol=symbol.replace("USDT", ""),
interval=interval,
ts=datetime.fromtimestamp(candle[0] / 1000, tz=timezone.utc),
open=float(candle[1]),
high=float(candle[2]),
low=float(candle[3]),
close=float(candle[4]),
volume=float(candle[5]),
source="Binance",
stored_at=datetime.now(timezone.utc)
)
session.add(ohlc_record)
stored_count += 1
session.commit()
logger.info(f"πŸ’Ύ Stored {stored_count} OHLC candles ({symbol} {interval})")
return stored_count
except Exception as e:
logger.error(f"❌ Error storing OHLC data: {str(e)}", exc_info=True)
return 0
def store_fear_greed_data(self, data: dict) -> int:
"""
Store Fear & Greed Index in database
Args:
data: Alternative.me API response
Returns:
Number of records stored
"""
if not data or "data" not in data:
return 0
stored_count = 0
try:
with self.db_manager.get_session() as session:
for item in data["data"][:1]: # Just store the latest
sentiment_record = SentimentMetric(
metric_name="fear_greed_index",
value=float(item.get("value", 0)),
classification=item.get("value_classification", "unknown"),
timestamp=datetime.fromtimestamp(int(item.get("timestamp", 0)), tz=timezone.utc),
source="Alternative.me"
)
session.add(sentiment_record)
stored_count += 1
session.commit()
logger.info(f"πŸ’Ύ Stored Fear & Greed: {data['data'][0].get('value_classification')}")
return stored_count
except Exception as e:
logger.error(f"❌ Error storing Fear & Greed data: {str(e)}", exc_info=True)
return 0
async def run_collection_cycle(self):
"""Run one complete collection cycle"""
logger.info("=" * 80)
logger.info("πŸš€ Starting collection cycle")
logger.info("=" * 80)
total_stored = 0
# 1. Fetch and store CoinGecko data
logger.info("πŸ“Š Collecting from CoinGecko...")
coingecko_data = await self.fetch_coingecko_prices()
total_stored += self.store_coingecko_data(coingecko_data)
# 2. Fetch and store Binance data
logger.info("πŸ“Š Collecting from Binance...")
binance_data = await self.fetch_binance_prices()
total_stored += self.store_binance_data(binance_data)
# 3. Fetch and store OHLC data (hourly candles for BTC)
logger.info("πŸ“Š Collecting OHLC candles...")
ohlc_data = await self.fetch_binance_ohlc(symbol="BTCUSDT", interval="1h", limit=24)
total_stored += self.store_ohlc_data(ohlc_data, symbol="BTCUSDT", interval="1h")
# 4. Fetch and store Fear & Greed Index
logger.info("πŸ“Š Collecting Fear & Greed Index...")
fear_greed_data = await self.fetch_fear_greed_index()
total_stored += self.store_fear_greed_data(fear_greed_data)
# 5. Show statistics
db_stats = self.db_manager.get_database_stats()
logger.info("=" * 80)
logger.info(f"βœ… Collection cycle complete!")
logger.info(f" New records: {total_stored}")
logger.info(f" Total market_prices: {db_stats.get('market_prices', 0)}")
logger.info(f" Database size: {db_stats.get('database_size_mb', 0)} MB")
logger.info("=" * 80)
async def start_continuous(self, interval_seconds: int = 60):
"""
Start continuous collection
Args:
interval_seconds: Seconds between collections
"""
self.running = True
logger.info(f"πŸ”„ Starting continuous collection (every {interval_seconds}s)")
logger.info(f" Press Ctrl+C to stop\n")
cycle_num = 0
while self.running:
try:
cycle_num += 1
logger.info(f"\nπŸ”„ Cycle #{cycle_num}")
await self.run_collection_cycle()
if self.running:
logger.info(f"\n⏳ Waiting {interval_seconds}s until next cycle...\n")
await asyncio.sleep(interval_seconds)
except asyncio.CancelledError:
break
except KeyboardInterrupt:
break
except Exception as e:
logger.error(f"❌ Cycle error: {str(e)}", exc_info=True)
await asyncio.sleep(10)
def stop(self):
"""Stop collection"""
self.running = False
logger.info("πŸ›‘ Stopping collection...")
async def main():
"""Main entry point"""
logger.info("=" * 80)
logger.info("πŸš€ SIMPLE MARKET DATA COLLECTOR")
logger.info(" Collects from FREE APIs: CoinGecko + Binance")
logger.info("=" * 80)
collector = SimpleMarketCollector()
try:
# Run continuous collection (60 second interval)
await collector.start_continuous(interval_seconds=60)
except KeyboardInterrupt:
logger.info("\nπŸ›‘ Received shutdown signal")
finally:
collector.stop()
logger.info("πŸ‘‹ Collector stopped\n")
if __name__ == "__main__":
asyncio.run(main())