lokiai / usage_tracker.py
ParthSadaria's picture
Update usage_tracker.py
d5a05cc verified
import json
import os
import datetime
import threading
from collections import defaultdict
from typing import List, Dict, Any, Optional
from fastapi import Request
class UsageTracker:
def __init__(self, data_file="usage_data.json"):
self.data_file = data_file
self.lock = threading.Lock()
self.data = self._load_data()
self._schedule_save()
def _load_data(self) -> Dict[str, Any]:
"""
Loads usage data from the JSON file, ensuring data integrity.
Handles cases where the file might be corrupted or in an old format.
"""
if os.path.exists(self.data_file):
try:
with open(self.data_file, 'r') as f:
data = json.load(f)
# Check if data is in the expected new format
if isinstance(data, dict) and 'requests' in data and 'models' in data and 'api_endpoints' in data:
return data
# If data is in an older, simpler format, convert it
elif isinstance(data, dict) and 'total_requests' in data: # Heuristic for old format
return self._convert_old_format(data)
except (json.JSONDecodeError, TypeError) as e:
print(f"Warning: Could not decode JSON from {self.data_file} ({e}). Starting fresh.")
return self._initialize_empty_data()
def _initialize_empty_data(self) -> Dict[str, Any]:
"""
Initializes a new, empty data structure for usage tracking.
This structure includes a list for all requests, and dictionaries
to store aggregated data for models and API endpoints.
"""
return {
'requests': [],
'models': defaultdict(lambda: {'total_requests': 0, 'first_used': None, 'last_used': None}),
'api_endpoints': defaultdict(lambda: {'total_requests': 0, 'first_used': None, 'last_used': None})
}
def _convert_old_format(self, old_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Converts data from the old format to the new detailed format.
This is a crucial step to avoid data loss on updates.
It iterates through old 'requests' (if any) and re-records them
into the new structured format.
"""
print("Converting old usage data format to new format.")
new_data = self._initialize_empty_data()
# Preserve existing requests if they follow a basic structure
if 'requests' in old_data and isinstance(old_data['requests'], list):
for req in old_data['requests']:
# Attempt to extract relevant fields from old request entry
timestamp_str = req.get('timestamp')
model_name = req.get('model', 'unknown_model')
endpoint_name = req.get('endpoint', 'unknown_endpoint')
ip_address = req.get('ip_address', 'N/A')
user_agent = req.get('user_agent', 'N/A')
# Ensure timestamp is valid and parseable
try:
timestamp = datetime.datetime.fromisoformat(timestamp_str) if timestamp_str else datetime.datetime.now(datetime.timezone.utc)
except ValueError:
timestamp = datetime.datetime.now(datetime.timezone.utc) # Fallback if timestamp is malformed
new_data['requests'].append({
'timestamp': timestamp.isoformat(),
'model': model_name,
'endpoint': endpoint_name,
'ip_address': ip_address,
'user_agent': user_agent,
})
# Update aggregated stats for models and endpoints
# This ensures that even old data contributes to the new summary
if not new_data['models'][model_name]['first_used'] or timestamp < datetime.datetime.fromisoformat(new_data['models'][model_name]['first_used']):
new_data['models'][model_name]['first_used'] = timestamp.isoformat()
if not new_data['models'][model_name]['last_used'] or timestamp > datetime.datetime.fromisoformat(new_data['models'][model_name]['last_used']):
new_data['models'][model_name]['last_used'] = timestamp.isoformat()
new_data['models'][model_name]['total_requests'] += 1
if not new_data['api_endpoints'][endpoint_name]['first_used'] or timestamp < datetime.datetime.fromisoformat(new_data['api_endpoints'][endpoint_name]['first_used']):
new_data['api_endpoints'][endpoint_name]['first_used'] = timestamp.isoformat()
if not new_data['api_endpoints'][endpoint_name]['last_used'] or timestamp > datetime.datetime.fromisoformat(new_data['api_endpoints'][endpoint_name]['last_used']):
new_data['api_endpoints'][endpoint_name]['last_used'] = timestamp.isoformat()
new_data['api_endpoints'][endpoint_name]['total_requests'] += 1
print("Data conversion complete.")
return new_data
def save_data(self):
"""Saves current usage data to the JSON file periodically."""
with self.lock:
try:
# Convert defaultdicts to regular dicts for JSON serialization
serializable_data = {
'requests': self.data['requests'],
'models': dict(self.data['models']),
'api_endpoints': dict(self.data['api_endpoints'])
}
with open(self.data_file, 'w') as f:
json.dump(serializable_data, f, indent=4)
except IOError as e:
print(f"Error saving usage data to {self.data_file}: {e}")
def _schedule_save(self):
"""Schedules the data to be saved every 60 seconds."""
# Use a non-daemon thread for saving to ensure it runs even if main thread exits
# if using daemon threads, ensure proper shutdown hook is in place.
# For simplicity in this context, a direct Timer call is fine.
threading.Timer(60.0, self._schedule_save).start()
self.save_data()
def record_request(self, request: Optional[Request] = None, model: str = "unknown", endpoint: str = "unknown"):
"""
Records a single API request with detailed information.
Updates both the raw request list and aggregated statistics.
"""
with self.lock:
now = datetime.datetime.now(datetime.timezone.utc)
ip_address = request.client.host if request and request.client else "N/A"
user_agent = request.headers.get("user-agent", "N/A") if request else "N/A"
# Append to raw requests list
self.data['requests'].append({
'timestamp': now.isoformat(),
'model': model,
'endpoint': endpoint,
'ip_address': ip_address,
'user_agent': user_agent,
})
# Update model specific stats
model_stats = self.data['models'][model]
model_stats['total_requests'] += 1
if model_stats['first_used'] is None or now < datetime.datetime.fromisoformat(model_stats['first_used']):
model_stats['first_used'] = now.isoformat()
if model_stats['last_used'] is None or now > datetime.datetime.fromisoformat(model_stats['last_used']):
model_stats['last_used'] = now.isoformat()
# Update endpoint specific stats
endpoint_stats = self.data['api_endpoints'][endpoint]
endpoint_stats['total_requests'] += 1
if endpoint_stats['first_used'] is None or now < datetime.datetime.fromisoformat(endpoint_stats['first_used']):
endpoint_stats['first_used'] = now.isoformat()
if endpoint_stats['last_used'] is None or now > datetime.datetime.fromisoformat(endpoint_stats['last_used']):
endpoint_stats['last_used'] = now.isoformat()
def get_usage_summary(self, days: int = 7) -> Dict[str, Any]:
"""
Generates a comprehensive summary of usage data for the specified number of days.
Includes total requests, model usage, endpoint usage, daily usage, and unique IPs.
"""
with self.lock:
summary = {
'total_requests': 0,
'model_usage': defaultdict(int), # Requests per model for the period
'endpoint_usage': defaultdict(int), # Requests per endpoint for the period
'daily_usage': defaultdict(lambda: {'requests': 0, 'unique_ips': set()}), # Daily stats
'unique_ips_total': set(), # Unique IPs across all requests
'recent_requests': []
}
# Prepare data for model and API endpoint charts
# These are based on the aggregated 'self.data' which covers all time,
# but the summary 'model_usage' and 'endpoint_usage' below are for the given 'days' period.
all_time_model_data = {
model: {
'total_requests': stats['total_requests'],
'first_used': stats['first_used'],
'last_used': stats['last_used']
} for model, stats in self.data['models'].items()
}
all_time_endpoint_data = {
endpoint: {
'total_requests': stats['total_requests'],
'first_used': stats['first_used'],
'last_used': stats['last_used']
} for endpoint, stats in self.data['api_endpoints'].items()
}
cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=days)
# Iterate backwards for recent requests and aggregate data for the specified period
requests_for_period = []
for req in reversed(self.data['requests']):
req_time = datetime.datetime.fromisoformat(req['timestamp'])
# Always update total requests and unique IPs for all time
summary['total_requests'] += 1
summary['unique_ips_total'].add(req['ip_address'])
if req_time >= cutoff_date:
requests_for_period.append(req)
date_str = req_time.strftime("%Y-%m-%d")
# Aggregate data for charts and tables for the given period
summary['model_usage'][req['model']] += 1
summary['endpoint_usage'][req['endpoint']] += 1
summary['daily_usage'][date_str]['requests'] += 1
summary['daily_usage'][date_str]['unique_ips'].add(req['ip_address'])
# Add to recent requests list (up to 20)
if len(summary['recent_requests']) < 20:
summary['recent_requests'].append(req)
# Convert daily unique IPs set to count
for date_str, daily_stats in summary['daily_usage'].items():
daily_stats['unique_ips_count'] = len(daily_stats['unique_ips'])
del daily_stats['unique_ips'] # Remove the set before returning
# Sort daily usage by date
summary['daily_usage'] = dict(sorted(summary['daily_usage'].items()))
# Convert defaultdicts to regular dicts for final summary
summary['model_usage_period'] = dict(summary['model_usage'])
summary['endpoint_usage_period'] = dict(summary['endpoint_usage'])
summary['daily_usage_period'] = dict(summary['daily_usage'])
# Add all-time data
summary['all_time_model_usage'] = all_time_model_data
summary['all_time_endpoint_usage'] = all_time_endpoint_data
summary['unique_ips_total_count'] = len(summary['unique_ips_total'])
del summary['unique_ips_total'] # No need to send the whole set
# Clean up defaultdicts that are not needed in the final output structure
del summary['model_usage']
del summary['endpoint_usage']
del summary['daily_usage']
return summary