Really-amin's picture
Upload folder using huggingface_hub
6fd104e verified
"""
Demonstration Script for All Collector Modules
This script demonstrates the usage of all collector modules and
provides a comprehensive overview of data collection capabilities.
"""
import asyncio
import json
from datetime import datetime
from typing import Dict, List, Any
# Import all collector functions
from collectors import (
collect_market_data,
collect_explorer_data,
collect_news_data,
collect_sentiment_data,
collect_onchain_data
)
def print_separator(title: str = ""):
"""Print a formatted separator line"""
if title:
print(f"\n{'='*70}")
print(f" {title}")
print(f"{'='*70}\n")
else:
print(f"{'='*70}\n")
def format_result_summary(result: Dict[str, Any]) -> str:
"""Format a single result for display"""
lines = []
lines.append(f"Provider: {result.get('provider', 'Unknown')}")
lines.append(f"Category: {result.get('category', 'Unknown')}")
lines.append(f"Success: {result.get('success', False)}")
if result.get('success'):
lines.append(f"Response Time: {result.get('response_time_ms', 0):.2f}ms")
staleness = result.get('staleness_minutes')
if staleness is not None:
lines.append(f"Data Staleness: {staleness:.2f} minutes")
# Add provider-specific info
if result.get('index_value'):
lines.append(f"Fear & Greed Index: {result['index_value']} ({result['index_classification']})")
if result.get('post_count'):
lines.append(f"Posts: {result['post_count']}")
if result.get('article_count'):
lines.append(f"Articles: {result['article_count']}")
if result.get('is_placeholder'):
lines.append("Status: PLACEHOLDER IMPLEMENTATION")
else:
lines.append(f"Error Type: {result.get('error_type', 'unknown')}")
lines.append(f"Error: {result.get('error', 'Unknown error')}")
return "\n".join(lines)
def print_category_summary(category: str, results: List[Dict[str, Any]]):
"""Print summary for a category of collectors"""
print_separator(f"{category.upper()}")
total = len(results)
successful = sum(1 for r in results if r.get('success', False))
print(f"Total Collectors: {total}")
print(f"Successful: {successful}")
print(f"Failed: {total - successful}")
print()
for i, result in enumerate(results, 1):
print(f"[{i}/{total}] {'-'*60}")
print(format_result_summary(result))
print()
async def collect_all_data() -> Dict[str, List[Dict[str, Any]]]:
"""
Collect data from all categories concurrently
Returns:
Dictionary with categories as keys and results as values
"""
print_separator("Starting Data Collection from All Sources")
print(f"Timestamp: {datetime.utcnow().isoformat()}Z\n")
# Run all collectors concurrently
print("Executing all collectors in parallel...")
market_results, explorer_results, news_results, sentiment_results, onchain_results = await asyncio.gather(
collect_market_data(),
collect_explorer_data(),
collect_news_data(),
collect_sentiment_data(),
collect_onchain_data(),
return_exceptions=True
)
# Handle any exceptions
def handle_exception(result, category):
if isinstance(result, Exception):
return [{
"provider": "Unknown",
"category": category,
"success": False,
"error": str(result),
"error_type": "exception"
}]
return result
return {
"market_data": handle_exception(market_results, "market_data"),
"explorers": handle_exception(explorer_results, "blockchain_explorers"),
"news": handle_exception(news_results, "news"),
"sentiment": handle_exception(sentiment_results, "sentiment"),
"onchain": handle_exception(onchain_results, "onchain_analytics")
}
async def main():
"""Main demonstration function"""
print_separator("Cryptocurrency Data Collector - Comprehensive Demo")
# Collect all data
all_results = await collect_all_data()
# Print results by category
print_category_summary("Market Data Collection", all_results["market_data"])
print_category_summary("Blockchain Explorer Data", all_results["explorers"])
print_category_summary("News Data Collection", all_results["news"])
print_category_summary("Sentiment Data Collection", all_results["sentiment"])
print_category_summary("On-Chain Analytics Data", all_results["onchain"])
# Overall statistics
print_separator("Overall Collection Statistics")
total_collectors = sum(len(results) for results in all_results.values())
total_successful = sum(
sum(1 for r in results if r.get('success', False))
for results in all_results.values()
)
total_failed = total_collectors - total_successful
# Calculate average response time for successful calls
response_times = [
r.get('response_time_ms', 0)
for results in all_results.values()
for r in results
if r.get('success', False) and 'response_time_ms' in r
]
avg_response_time = sum(response_times) / len(response_times) if response_times else 0
print(f"Total Collectors Run: {total_collectors}")
print(f"Successful: {total_successful} ({total_successful/total_collectors*100:.1f}%)")
print(f"Failed: {total_failed} ({total_failed/total_collectors*100:.1f}%)")
print(f"Average Response Time: {avg_response_time:.2f}ms")
print()
# Category breakdown
print("By Category:")
for category, results in all_results.items():
successful = sum(1 for r in results if r.get('success', False))
total = len(results)
print(f" {category:20} {successful}/{total} successful")
print_separator()
# Save results to file
output_file = f"collector_results_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
try:
with open(output_file, 'w') as f:
json.dump(all_results, f, indent=2, default=str)
print(f"Results saved to: {output_file}")
except Exception as e:
print(f"Failed to save results: {e}")
print_separator("Demo Complete")
return all_results
if __name__ == "__main__":
# Run the demonstration
results = asyncio.run(main())
# Exit with appropriate code
total_collectors = sum(len(r) for r in results.values())
total_successful = sum(
sum(1 for item in r if item.get('success', False))
for r in results.values()
)
# Exit with 0 if at least 50% successful, else 1
exit(0 if total_successful >= total_collectors / 2 else 1)