Coverage for database.py: 0.00%
456 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-25 15:37 +0330
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-25 15:37 +0330
1#!/usr/bin/env python3
2"""
3Database module for Crypto Data Aggregator
4Complete CRUD operations with the exact schema specified
5"""
7import sqlite3
8import threading
9import json
10from datetime import datetime, timedelta
11from typing import List, Dict, Optional, Any, Tuple
12from contextlib import contextmanager
13import logging
15import config
17# Setup logging
18logging.basicConfig(
19 level=getattr(logging, config.LOG_LEVEL),
20 format=config.LOG_FORMAT,
21 handlers=[
22 logging.FileHandler(config.LOG_FILE),
23 logging.StreamHandler()
24 ]
25)
26logger = logging.getLogger(__name__)
29class CryptoDatabase:
30 """
31 Database manager for cryptocurrency data with full CRUD operations
32 Thread-safe implementation using context managers
33 """
35 def __init__(self, db_path: str = None):
36 """Initialize database with connection pooling"""
37 self.db_path = str(db_path or config.DATABASE_PATH)
38 self._local = threading.local()
39 self._init_database()
40 logger.info(f"Database initialized at {self.db_path}")
42 @contextmanager
43 def get_connection(self):
44 """Get thread-safe database connection"""
45 if not hasattr(self._local, 'conn'):
46 self._local.conn = sqlite3.connect(
47 self.db_path,
48 check_same_thread=False,
49 timeout=30.0
50 )
51 self._local.conn.row_factory = sqlite3.Row
53 try:
54 yield self._local.conn
55 except Exception as e:
56 self._local.conn.rollback()
57 logger.error(f"Database error: {e}")
58 raise
60 def _init_database(self):
61 """Initialize all database tables with exact schema"""
62 with self.get_connection() as conn:
63 cursor = conn.cursor()
65 # ==================== PRICES TABLE ====================
66 cursor.execute("""
67 CREATE TABLE IF NOT EXISTS prices (
68 id INTEGER PRIMARY KEY AUTOINCREMENT,
69 symbol TEXT NOT NULL,
70 name TEXT,
71 price_usd REAL NOT NULL,
72 volume_24h REAL,
73 market_cap REAL,
74 percent_change_1h REAL,
75 percent_change_24h REAL,
76 percent_change_7d REAL,
77 rank INTEGER,
78 timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
79 )
80 """)
82 # ==================== NEWS TABLE ====================
83 cursor.execute("""
84 CREATE TABLE IF NOT EXISTS news (
85 id INTEGER PRIMARY KEY AUTOINCREMENT,
86 title TEXT NOT NULL,
87 summary TEXT,
88 url TEXT UNIQUE,
89 source TEXT,
90 sentiment_score REAL,
91 sentiment_label TEXT,
92 related_coins TEXT,
93 published_date DATETIME,
94 timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
95 )
96 """)
98 # ==================== MARKET ANALYSIS TABLE ====================
99 cursor.execute("""
100 CREATE TABLE IF NOT EXISTS market_analysis (
101 id INTEGER PRIMARY KEY AUTOINCREMENT,
102 symbol TEXT NOT NULL,
103 timeframe TEXT,
104 trend TEXT,
105 support_level REAL,
106 resistance_level REAL,
107 prediction TEXT,
108 confidence REAL,
109 timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
110 )
111 """)
113 # ==================== USER QUERIES TABLE ====================
114 cursor.execute("""
115 CREATE TABLE IF NOT EXISTS user_queries (
116 id INTEGER PRIMARY KEY AUTOINCREMENT,
117 query TEXT,
118 result_count INTEGER,
119 timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
120 )
121 """)
123 # ==================== MODEL OUTPUTS TABLE ====================
124 cursor.execute("""
125 CREATE TABLE IF NOT EXISTS model_outputs (
126 id TEXT PRIMARY KEY,
127 symbol TEXT NOT NULL,
128 model_key TEXT NOT NULL,
129 prediction_type TEXT NOT NULL,
130 confidence_score REAL,
131 prediction_data TEXT NOT NULL,
132 explanation TEXT,
133 metadata TEXT,
134 created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
135 expires_at DATETIME
136 )
137 """)
139 # ==================== GAP FILLING AUDIT TABLE ====================
140 cursor.execute("""
141 CREATE TABLE IF NOT EXISTS gap_filling_audit (
142 id TEXT PRIMARY KEY,
143 request_id TEXT NOT NULL,
144 gap_type TEXT NOT NULL,
145 strategy_used TEXT NOT NULL,
146 success INTEGER NOT NULL DEFAULT 0,
147 confidence REAL,
148 execution_time_ms INTEGER,
149 models_attempted TEXT,
150 providers_attempted TEXT,
151 filled_fields TEXT,
152 metadata TEXT,
153 created_at DATETIME DEFAULT CURRENT_TIMESTAMP
154 )
155 """)
157 # ==================== PROVIDER CACHE TABLE ====================
158 cursor.execute("""
159 CREATE TABLE IF NOT EXISTS provider_cache (
160 id TEXT PRIMARY KEY,
161 provider_key TEXT NOT NULL,
162 endpoint TEXT NOT NULL,
163 params_hash TEXT NOT NULL,
164 response_data TEXT NOT NULL,
165 success INTEGER NOT NULL DEFAULT 1,
166 created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
167 expires_at DATETIME
168 )
169 """)
171 # ==================== CREATE INDEXES ====================
172 cursor.execute("CREATE INDEX IF NOT EXISTS idx_prices_symbol ON prices(symbol)")
173 cursor.execute("CREATE INDEX IF NOT EXISTS idx_prices_timestamp ON prices(timestamp)")
174 cursor.execute("CREATE INDEX IF NOT EXISTS idx_prices_rank ON prices(rank)")
175 cursor.execute("CREATE INDEX IF NOT EXISTS idx_news_url ON news(url)")
176 cursor.execute("CREATE INDEX IF NOT EXISTS idx_news_published ON news(published_date)")
177 cursor.execute("CREATE INDEX IF NOT EXISTS idx_news_sentiment ON news(sentiment_label)")
178 cursor.execute("CREATE INDEX IF NOT EXISTS idx_analysis_symbol ON market_analysis(symbol)")
179 cursor.execute("CREATE INDEX IF NOT EXISTS idx_analysis_timestamp ON market_analysis(timestamp)")
181 # New indexes for new tables
182 cursor.execute("CREATE INDEX IF NOT EXISTS idx_model_outputs_symbol ON model_outputs(symbol)")
183 cursor.execute("CREATE INDEX IF NOT EXISTS idx_model_outputs_model_key ON model_outputs(model_key)")
184 cursor.execute("CREATE INDEX IF NOT EXISTS idx_model_outputs_created_at ON model_outputs(created_at)")
185 cursor.execute("CREATE INDEX IF NOT EXISTS idx_gap_audit_gap_type ON gap_filling_audit(gap_type)")
186 cursor.execute("CREATE INDEX IF NOT EXISTS idx_gap_audit_request_id ON gap_filling_audit(request_id)")
187 cursor.execute("CREATE INDEX IF NOT EXISTS idx_provider_cache_provider ON provider_cache(provider_key, endpoint)")
188 cursor.execute("CREATE INDEX IF NOT EXISTS idx_provider_cache_params ON provider_cache(params_hash)")
190 conn.commit()
191 logger.info("Database tables and indexes created successfully")
193 # ==================== PRICES CRUD OPERATIONS ====================
195 def save_price(self, price_data: Dict[str, Any]) -> bool:
196 """
197 Save a single price record
199 Args:
200 price_data: Dictionary containing price information
202 Returns:
203 bool: True if successful, False otherwise
204 """
205 try:
206 with self.get_connection() as conn:
207 cursor = conn.cursor()
208 cursor.execute("""
209 INSERT INTO prices
210 (symbol, name, price_usd, volume_24h, market_cap,
211 percent_change_1h, percent_change_24h, percent_change_7d, rank)
212 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
213 """, (
214 price_data.get('symbol'),
215 price_data.get('name'),
216 price_data.get('price_usd', 0.0),
217 price_data.get('volume_24h'),
218 price_data.get('market_cap'),
219 price_data.get('percent_change_1h'),
220 price_data.get('percent_change_24h'),
221 price_data.get('percent_change_7d'),
222 price_data.get('rank')
223 ))
224 conn.commit()
225 return True
226 except Exception as e:
227 logger.error(f"Error saving price: {e}")
228 return False
230 def save_prices_batch(self, prices: List[Dict[str, Any]]) -> int:
231 """
232 Save multiple price records in batch (minimum 100 records for efficiency)
234 Args:
235 prices: List of price dictionaries
237 Returns:
238 int: Number of records saved
239 """
240 saved_count = 0
241 try:
242 with self.get_connection() as conn:
243 cursor = conn.cursor()
244 for price_data in prices:
245 try:
246 cursor.execute("""
247 INSERT INTO prices
248 (symbol, name, price_usd, volume_24h, market_cap,
249 percent_change_1h, percent_change_24h, percent_change_7d, rank)
250 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
251 """, (
252 price_data.get('symbol'),
253 price_data.get('name'),
254 price_data.get('price_usd', 0.0),
255 price_data.get('volume_24h'),
256 price_data.get('market_cap'),
257 price_data.get('percent_change_1h'),
258 price_data.get('percent_change_24h'),
259 price_data.get('percent_change_7d'),
260 price_data.get('rank')
261 ))
262 saved_count += 1
263 except Exception as e:
264 logger.warning(f"Error saving individual price: {e}")
265 continue
266 conn.commit()
267 logger.info(f"Batch saved {saved_count} price records")
268 except Exception as e:
269 logger.error(f"Error in batch save: {e}")
270 return saved_count
272 def get_latest_prices(self, limit: int = 100) -> List[Dict[str, Any]]:
273 """
274 Get latest prices for top cryptocurrencies
276 Args:
277 limit: Maximum number of records to return
279 Returns:
280 List of price dictionaries
281 """
282 try:
283 with self.get_connection() as conn:
284 cursor = conn.cursor()
285 cursor.execute("""
286 SELECT DISTINCT ON (symbol) *
287 FROM prices
288 WHERE timestamp >= datetime('now', '-1 hour')
289 ORDER BY symbol, timestamp DESC, rank ASC
290 LIMIT ?
291 """, (limit,))
293 # SQLite doesn't support DISTINCT ON, use subquery instead
294 cursor.execute("""
295 SELECT p1.*
296 FROM prices p1
297 INNER JOIN (
298 SELECT symbol, MAX(timestamp) as max_ts
299 FROM prices
300 WHERE timestamp >= datetime('now', '-1 hour')
301 GROUP BY symbol
302 ) p2 ON p1.symbol = p2.symbol AND p1.timestamp = p2.max_ts
303 ORDER BY p1.rank ASC, p1.market_cap DESC
304 LIMIT ?
305 """, (limit,))
307 return [dict(row) for row in cursor.fetchall()]
308 except Exception as e:
309 logger.error(f"Error getting latest prices: {e}")
310 return []
312 def get_price_history(self, symbol: str, hours: int = 24) -> List[Dict[str, Any]]:
313 """
314 Get price history for a specific symbol
316 Args:
317 symbol: Cryptocurrency symbol
318 hours: Number of hours to look back
320 Returns:
321 List of price dictionaries
322 """
323 try:
324 with self.get_connection() as conn:
325 cursor = conn.cursor()
326 cursor.execute("""
327 SELECT * FROM prices
328 WHERE symbol = ?
329 AND timestamp >= datetime('now', '-' || ? || ' hours')
330 ORDER BY timestamp ASC
331 """, (symbol, hours))
332 return [dict(row) for row in cursor.fetchall()]
333 except Exception as e:
334 logger.error(f"Error getting price history: {e}")
335 return []
337 def get_top_gainers(self, limit: int = 10) -> List[Dict[str, Any]]:
338 """Get top gaining cryptocurrencies in last 24h"""
339 try:
340 with self.get_connection() as conn:
341 cursor = conn.cursor()
342 cursor.execute("""
343 SELECT p1.*
344 FROM prices p1
345 INNER JOIN (
346 SELECT symbol, MAX(timestamp) as max_ts
347 FROM prices
348 WHERE timestamp >= datetime('now', '-1 hour')
349 GROUP BY symbol
350 ) p2 ON p1.symbol = p2.symbol AND p1.timestamp = p2.max_ts
351 WHERE p1.percent_change_24h IS NOT NULL
352 ORDER BY p1.percent_change_24h DESC
353 LIMIT ?
354 """, (limit,))
355 return [dict(row) for row in cursor.fetchall()]
356 except Exception as e:
357 logger.error(f"Error getting top gainers: {e}")
358 return []
360 def delete_old_prices(self, days: int = 30) -> int:
361 """
362 Delete price records older than specified days
364 Args:
365 days: Number of days to keep
367 Returns:
368 Number of deleted records
369 """
370 try:
371 with self.get_connection() as conn:
372 cursor = conn.cursor()
373 cursor.execute("""
374 DELETE FROM prices
375 WHERE timestamp < datetime('now', '-' || ? || ' days')
376 """, (days,))
377 conn.commit()
378 deleted = cursor.rowcount
379 logger.info(f"Deleted {deleted} old price records")
380 return deleted
381 except Exception as e:
382 logger.error(f"Error deleting old prices: {e}")
383 return 0
385 # ==================== NEWS CRUD OPERATIONS ====================
387 def save_news(self, news_data: Dict[str, Any]) -> bool:
388 """
389 Save a single news record
391 Args:
392 news_data: Dictionary containing news information
394 Returns:
395 bool: True if successful, False otherwise
396 """
397 try:
398 with self.get_connection() as conn:
399 cursor = conn.cursor()
400 cursor.execute("""
401 INSERT OR IGNORE INTO news
402 (title, summary, url, source, sentiment_score,
403 sentiment_label, related_coins, published_date)
404 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
405 """, (
406 news_data.get('title'),
407 news_data.get('summary'),
408 news_data.get('url'),
409 news_data.get('source'),
410 news_data.get('sentiment_score'),
411 news_data.get('sentiment_label'),
412 json.dumps(news_data.get('related_coins', [])),
413 news_data.get('published_date')
414 ))
415 conn.commit()
416 return True
417 except Exception as e:
418 logger.error(f"Error saving news: {e}")
419 return False
421 def get_latest_news(self, limit: int = 50, sentiment: Optional[str] = None) -> List[Dict[str, Any]]:
422 """
423 Get latest news articles
425 Args:
426 limit: Maximum number of articles
427 sentiment: Filter by sentiment label (optional)
429 Returns:
430 List of news dictionaries
431 """
432 try:
433 with self.get_connection() as conn:
434 cursor = conn.cursor()
436 if sentiment:
437 cursor.execute("""
438 SELECT * FROM news
439 WHERE sentiment_label = ?
440 ORDER BY published_date DESC, timestamp DESC
441 LIMIT ?
442 """, (sentiment, limit))
443 else:
444 cursor.execute("""
445 SELECT * FROM news
446 ORDER BY published_date DESC, timestamp DESC
447 LIMIT ?
448 """, (limit,))
450 results = []
451 for row in cursor.fetchall():
452 news_dict = dict(row)
453 if news_dict.get('related_coins'):
454 try:
455 news_dict['related_coins'] = json.loads(news_dict['related_coins'])
456 except:
457 news_dict['related_coins'] = []
458 results.append(news_dict)
460 return results
461 except Exception as e:
462 logger.error(f"Error getting latest news: {e}")
463 return []
465 def get_news_by_coin(self, coin: str, limit: int = 20) -> List[Dict[str, Any]]:
466 """Get news related to a specific coin"""
467 try:
468 with self.get_connection() as conn:
469 cursor = conn.cursor()
470 cursor.execute("""
471 SELECT * FROM news
472 WHERE related_coins LIKE ?
473 ORDER BY published_date DESC
474 LIMIT ?
475 """, (f'%{coin}%', limit))
477 results = []
478 for row in cursor.fetchall():
479 news_dict = dict(row)
480 if news_dict.get('related_coins'):
481 try:
482 news_dict['related_coins'] = json.loads(news_dict['related_coins'])
483 except:
484 news_dict['related_coins'] = []
485 results.append(news_dict)
487 return results
488 except Exception as e:
489 logger.error(f"Error getting news by coin: {e}")
490 return []
492 def update_news_sentiment(self, news_id: int, sentiment_score: float, sentiment_label: str) -> bool:
493 """Update sentiment for a news article"""
494 try:
495 with self.get_connection() as conn:
496 cursor = conn.cursor()
497 cursor.execute("""
498 UPDATE news
499 SET sentiment_score = ?, sentiment_label = ?
500 WHERE id = ?
501 """, (sentiment_score, sentiment_label, news_id))
502 conn.commit()
503 return True
504 except Exception as e:
505 logger.error(f"Error updating news sentiment: {e}")
506 return False
508 def delete_old_news(self, days: int = 30) -> int:
509 """Delete news older than specified days"""
510 try:
511 with self.get_connection() as conn:
512 cursor = conn.cursor()
513 cursor.execute("""
514 DELETE FROM news
515 WHERE timestamp < datetime('now', '-' || ? || ' days')
516 """, (days,))
517 conn.commit()
518 deleted = cursor.rowcount
519 logger.info(f"Deleted {deleted} old news records")
520 return deleted
521 except Exception as e:
522 logger.error(f"Error deleting old news: {e}")
523 return 0
525 # ==================== MARKET ANALYSIS CRUD OPERATIONS ====================
527 def save_analysis(self, analysis_data: Dict[str, Any]) -> bool:
528 """Save market analysis"""
529 try:
530 with self.get_connection() as conn:
531 cursor = conn.cursor()
532 cursor.execute("""
533 INSERT INTO market_analysis
534 (symbol, timeframe, trend, support_level, resistance_level,
535 prediction, confidence)
536 VALUES (?, ?, ?, ?, ?, ?, ?)
537 """, (
538 analysis_data.get('symbol'),
539 analysis_data.get('timeframe'),
540 analysis_data.get('trend'),
541 analysis_data.get('support_level'),
542 analysis_data.get('resistance_level'),
543 analysis_data.get('prediction'),
544 analysis_data.get('confidence')
545 ))
546 conn.commit()
547 return True
548 except Exception as e:
549 logger.error(f"Error saving analysis: {e}")
550 return False
552 def get_latest_analysis(self, symbol: str) -> Optional[Dict[str, Any]]:
553 """Get latest analysis for a symbol"""
554 try:
555 with self.get_connection() as conn:
556 cursor = conn.cursor()
557 cursor.execute("""
558 SELECT * FROM market_analysis
559 WHERE symbol = ?
560 ORDER BY timestamp DESC
561 LIMIT 1
562 """, (symbol,))
563 row = cursor.fetchone()
564 return dict(row) if row else None
565 except Exception as e:
566 logger.error(f"Error getting latest analysis: {e}")
567 return None
569 def get_all_analyses(self, limit: int = 100) -> List[Dict[str, Any]]:
570 """Get all market analyses"""
571 try:
572 with self.get_connection() as conn:
573 cursor = conn.cursor()
574 cursor.execute("""
575 SELECT * FROM market_analysis
576 ORDER BY timestamp DESC
577 LIMIT ?
578 """, (limit,))
579 return [dict(row) for row in cursor.fetchall()]
580 except Exception as e:
581 logger.error(f"Error getting all analyses: {e}")
582 return []
584 # ==================== USER QUERIES CRUD OPERATIONS ====================
586 def log_user_query(self, query: str, result_count: int) -> bool:
587 """Log a user query"""
588 try:
589 with self.get_connection() as conn:
590 cursor = conn.cursor()
591 cursor.execute("""
592 INSERT INTO user_queries (query, result_count)
593 VALUES (?, ?)
594 """, (query, result_count))
595 conn.commit()
596 return True
597 except Exception as e:
598 logger.error(f"Error logging user query: {e}")
599 return False
601 def get_recent_queries(self, limit: int = 50) -> List[Dict[str, Any]]:
602 """Get recent user queries"""
603 try:
604 with self.get_connection() as conn:
605 cursor = conn.cursor()
606 cursor.execute("""
607 SELECT * FROM user_queries
608 ORDER BY timestamp DESC
609 LIMIT ?
610 """, (limit,))
611 return [dict(row) for row in cursor.fetchall()]
612 except Exception as e:
613 logger.error(f"Error getting recent queries: {e}")
614 return []
616 # ==================== UTILITY OPERATIONS ====================
618 def execute_safe_query(self, query: str, params: Tuple = ()) -> List[Dict[str, Any]]:
619 """
620 Execute a safe read-only query
622 Args:
623 query: SQL query (must start with SELECT)
624 params: Query parameters
626 Returns:
627 List of result dictionaries
628 """
629 try:
630 # Security: Only allow SELECT queries
631 if not query.strip().upper().startswith('SELECT'):
632 logger.warning(f"Attempted non-SELECT query: {query}")
633 return []
635 with self.get_connection() as conn:
636 cursor = conn.cursor()
637 cursor.execute(query, params)
638 return [dict(row) for row in cursor.fetchall()]
639 except Exception as e:
640 logger.error(f"Error executing safe query: {e}")
641 return []
643 def get_database_stats(self) -> Dict[str, Any]:
644 """Get database statistics"""
645 try:
646 with self.get_connection() as conn:
647 cursor = conn.cursor()
649 stats = {}
651 # Count records in each table
652 for table in ['prices', 'news', 'market_analysis', 'user_queries']:
653 cursor.execute(f"SELECT COUNT(*) as count FROM {table}")
654 stats[f'{table}_count'] = cursor.fetchone()['count']
656 # Get unique symbols
657 cursor.execute("SELECT COUNT(DISTINCT symbol) as count FROM prices")
658 stats['unique_symbols'] = cursor.fetchone()['count']
660 # Get latest price update
661 cursor.execute("SELECT MAX(timestamp) as latest FROM prices")
662 stats['latest_price_update'] = cursor.fetchone()['latest']
664 # Get latest news update
665 cursor.execute("SELECT MAX(timestamp) as latest FROM news")
666 stats['latest_news_update'] = cursor.fetchone()['latest']
668 # Database file size
669 import os
670 if os.path.exists(self.db_path):
671 stats['database_size_bytes'] = os.path.getsize(self.db_path)
672 stats['database_size_mb'] = stats['database_size_bytes'] / (1024 * 1024)
674 return stats
675 except Exception as e:
676 logger.error(f"Error getting database stats: {e}")
677 return {}
679 def vacuum_database(self) -> bool:
680 """Vacuum database to reclaim space"""
681 try:
682 with self.get_connection() as conn:
683 conn.execute("VACUUM")
684 logger.info("Database vacuumed successfully")
685 return True
686 except Exception as e:
687 logger.error(f"Error vacuuming database: {e}")
688 return False
690 def backup_database(self, backup_path: Optional[str] = None) -> bool:
691 """Create database backup"""
692 try:
693 import shutil
694 if backup_path is None:
695 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
696 backup_path = config.DATABASE_BACKUP_DIR / f"backup_{timestamp}.db"
698 shutil.copy2(self.db_path, backup_path)
699 logger.info(f"Database backed up to {backup_path}")
700 return True
701 except Exception as e:
702 logger.error(f"Error backing up database: {e}")
703 return False
705 def close(self):
706 """Close database connection"""
707 if hasattr(self._local, 'conn'):
708 self._local.conn.close()
709 delattr(self._local, 'conn')
710 logger.info("Database connection closed")
712 # ==================== MODEL OUTPUTS CRUD OPERATIONS ====================
714 def save_model_output(self, output_data: Dict[str, Any]) -> bool:
715 """Save AI model prediction output"""
716 try:
717 with self.get_connection() as conn:
718 cursor = conn.cursor()
719 cursor.execute("""
720 INSERT INTO model_outputs
721 (id, symbol, model_key, prediction_type, confidence_score,
722 prediction_data, explanation, metadata, expires_at)
723 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
724 """, (
725 output_data.get('id'),
726 output_data.get('symbol'),
727 output_data.get('model_key'),
728 output_data.get('prediction_type'),
729 output_data.get('confidence_score'),
730 json.dumps(output_data.get('prediction_data', {})),
731 json.dumps(output_data.get('explanation', {})),
732 json.dumps(output_data.get('metadata', {})),
733 output_data.get('expires_at')
734 ))
735 conn.commit()
736 return True
737 except Exception as e:
738 logger.error(f"Error saving model output: {e}")
739 return False
741 def get_model_outputs(
742 self,
743 symbol: Optional[str] = None,
744 model_key: Optional[str] = None,
745 limit: int = 100
746 ) -> List[Dict[str, Any]]:
747 """Get model outputs with optional filters"""
748 try:
749 with self.get_connection() as conn:
750 cursor = conn.cursor()
752 query = "SELECT * FROM model_outputs WHERE 1=1"
753 params = []
755 if symbol:
756 query += " AND symbol = ?"
757 params.append(symbol)
759 if model_key:
760 query += " AND model_key = ?"
761 params.append(model_key)
763 query += " ORDER BY created_at DESC LIMIT ?"
764 params.append(limit)
766 cursor.execute(query, params)
767 results = []
768 for row in cursor.fetchall():
769 output_dict = dict(row)
770 # Parse JSON fields
771 if output_dict.get('prediction_data'):
772 output_dict['prediction_data'] = json.loads(output_dict['prediction_data'])
773 if output_dict.get('explanation'):
774 output_dict['explanation'] = json.loads(output_dict['explanation'])
775 if output_dict.get('metadata'):
776 output_dict['metadata'] = json.loads(output_dict['metadata'])
777 results.append(output_dict)
779 return results
780 except Exception as e:
781 logger.error(f"Error getting model outputs: {e}")
782 return []
784 def delete_expired_model_outputs(self) -> int:
785 """Delete expired model outputs"""
786 try:
787 with self.get_connection() as conn:
788 cursor = conn.cursor()
789 cursor.execute("""
790 DELETE FROM model_outputs
791 WHERE expires_at IS NOT NULL
792 AND expires_at < datetime('now')
793 """)
794 conn.commit()
795 deleted = cursor.rowcount
796 logger.info(f"Deleted {deleted} expired model outputs")
797 return deleted
798 except Exception as e:
799 logger.error(f"Error deleting expired model outputs: {e}")
800 return 0
802 # ==================== GAP FILLING AUDIT CRUD OPERATIONS ====================
804 def save_gap_fill_audit(self, audit_data: Dict[str, Any]) -> bool:
805 """Save gap filling audit record"""
806 try:
807 with self.get_connection() as conn:
808 cursor = conn.cursor()
809 cursor.execute("""
810 INSERT INTO gap_filling_audit
811 (id, request_id, gap_type, strategy_used, success,
812 confidence, execution_time_ms, models_attempted,
813 providers_attempted, filled_fields, metadata)
814 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
815 """, (
816 audit_data.get('id'),
817 audit_data.get('request_id'),
818 audit_data.get('gap_type'),
819 audit_data.get('strategy_used'),
820 1 if audit_data.get('success') else 0,
821 audit_data.get('confidence'),
822 audit_data.get('execution_time_ms'),
823 json.dumps(audit_data.get('models_attempted', [])),
824 json.dumps(audit_data.get('providers_attempted', [])),
825 json.dumps(audit_data.get('filled_fields', [])),
826 json.dumps(audit_data.get('metadata', {}))
827 ))
828 conn.commit()
829 return True
830 except Exception as e:
831 logger.error(f"Error saving gap fill audit: {e}")
832 return False
834 def get_gap_fill_audit(
835 self,
836 gap_type: Optional[str] = None,
837 request_id: Optional[str] = None,
838 limit: int = 100
839 ) -> List[Dict[str, Any]]:
840 """Get gap filling audit records"""
841 try:
842 with self.get_connection() as conn:
843 cursor = conn.cursor()
845 query = "SELECT * FROM gap_filling_audit WHERE 1=1"
846 params = []
848 if gap_type:
849 query += " AND gap_type = ?"
850 params.append(gap_type)
852 if request_id:
853 query += " AND request_id = ?"
854 params.append(request_id)
856 query += " ORDER BY created_at DESC LIMIT ?"
857 params.append(limit)
859 cursor.execute(query, params)
860 results = []
861 for row in cursor.fetchall():
862 audit_dict = dict(row)
863 # Parse JSON fields
864 if audit_dict.get('models_attempted'):
865 audit_dict['models_attempted'] = json.loads(audit_dict['models_attempted'])
866 if audit_dict.get('providers_attempted'):
867 audit_dict['providers_attempted'] = json.loads(audit_dict['providers_attempted'])
868 if audit_dict.get('filled_fields'):
869 audit_dict['filled_fields'] = json.loads(audit_dict['filled_fields'])
870 if audit_dict.get('metadata'):
871 audit_dict['metadata'] = json.loads(audit_dict['metadata'])
872 results.append(audit_dict)
874 return results
875 except Exception as e:
876 logger.error(f"Error getting gap fill audit: {e}")
877 return []
879 def get_gap_fill_statistics(self) -> Dict[str, Any]:
880 """Get gap filling statistics"""
881 try:
882 with self.get_connection() as conn:
883 cursor = conn.cursor()
885 stats = {}
887 # Total attempts
888 cursor.execute("SELECT COUNT(*) as count FROM gap_filling_audit")
889 stats['total_attempts'] = cursor.fetchone()['count']
891 # Success rate
892 cursor.execute("SELECT COUNT(*) as count FROM gap_filling_audit WHERE success = 1")
893 successful = cursor.fetchone()['count']
894 stats['successful_fills'] = successful
895 stats['success_rate'] = successful / stats['total_attempts'] if stats['total_attempts'] > 0 else 0
897 # Average confidence
898 cursor.execute("SELECT AVG(confidence) as avg_conf FROM gap_filling_audit WHERE confidence IS NOT NULL")
899 stats['average_confidence'] = cursor.fetchone()['avg_conf'] or 0
901 # Average execution time
902 cursor.execute("SELECT AVG(execution_time_ms) as avg_time FROM gap_filling_audit")
903 stats['average_execution_time_ms'] = cursor.fetchone()['avg_time'] or 0
905 # By gap type
906 cursor.execute("""
907 SELECT gap_type, COUNT(*) as count
908 FROM gap_filling_audit
909 GROUP BY gap_type
910 """)
911 stats['by_gap_type'] = {row['gap_type']: row['count'] for row in cursor.fetchall()}
913 # By strategy
914 cursor.execute("""
915 SELECT strategy_used, COUNT(*) as count
916 FROM gap_filling_audit
917 GROUP BY strategy_used
918 """)
919 stats['by_strategy'] = {row['strategy_used']: row['count'] for row in cursor.fetchall()}
921 return stats
922 except Exception as e:
923 logger.error(f"Error getting gap fill statistics: {e}")
924 return {}
926 # ==================== PROVIDER CACHE CRUD OPERATIONS ====================
928 def save_provider_cache(self, cache_data: Dict[str, Any]) -> bool:
929 """Save provider response to cache"""
930 try:
931 with self.get_connection() as conn:
932 cursor = conn.cursor()
933 cursor.execute("""
934 INSERT OR REPLACE INTO provider_cache
935 (id, provider_key, endpoint, params_hash, response_data,
936 success, expires_at)
937 VALUES (?, ?, ?, ?, ?, ?, ?)
938 """, (
939 cache_data.get('id'),
940 cache_data.get('provider_key'),
941 cache_data.get('endpoint'),
942 cache_data.get('params_hash'),
943 json.dumps(cache_data.get('response_data', {})),
944 1 if cache_data.get('success') else 0,
945 cache_data.get('expires_at')
946 ))
947 conn.commit()
948 return True
949 except Exception as e:
950 logger.error(f"Error saving provider cache: {e}")
951 return False
953 def get_provider_cache(
954 self,
955 provider_key: str,
956 endpoint: str,
957 params_hash: str
958 ) -> Optional[Dict[str, Any]]:
959 """Get cached provider response"""
960 try:
961 with self.get_connection() as conn:
962 cursor = conn.cursor()
963 cursor.execute("""
964 SELECT * FROM provider_cache
965 WHERE provider_key = ? AND endpoint = ? AND params_hash = ?
966 AND (expires_at IS NULL OR expires_at > datetime('now'))
967 ORDER BY created_at DESC
968 LIMIT 1
969 """, (provider_key, endpoint, params_hash))
971 row = cursor.fetchone()
972 if row:
973 cache_dict = dict(row)
974 if cache_dict.get('response_data'):
975 cache_dict['response_data'] = json.loads(cache_dict['response_data'])
976 return cache_dict
977 return None
978 except Exception as e:
979 logger.error(f"Error getting provider cache: {e}")
980 return None
982 def delete_expired_provider_cache(self) -> int:
983 """Delete expired provider cache entries"""
984 try:
985 with self.get_connection() as conn:
986 cursor = conn.cursor()
987 cursor.execute("""
988 DELETE FROM provider_cache
989 WHERE expires_at IS NOT NULL
990 AND expires_at < datetime('now')
991 """)
992 conn.commit()
993 deleted = cursor.rowcount
994 logger.info(f"Deleted {deleted} expired cache entries")
995 return deleted
996 except Exception as e:
997 logger.error(f"Error deleting expired cache: {e}")
998 return 0
1001# Singleton instance
1002_db_instance = None
1005def get_database() -> CryptoDatabase:
1006 """Get database singleton instance"""
1007 global _db_instance
1008 if _db_instance is None:
1009 _db_instance = CryptoDatabase()
1010 return _db_instance