Really-amin's picture
Upload 400 files
353bd57 verified
"""
Input validation and sanitization utilities for API endpoints
Prevents XSS, injection attacks, and validates data structures
"""
import re
from typing import Any, Dict, List, Optional, Union
import html
def sanitize_string(value: Any, max_length: int = 10000) -> str:
"""
Sanitize string input to prevent XSS and injection attacks
Args:
value: Input value to sanitize
max_length: Maximum allowed length
Returns:
Sanitized string
"""
if value is None:
return ""
# Convert to string
str_value = str(value)
# Truncate if too long
if len(str_value) > max_length:
str_value = str_value[:max_length]
# HTML escape to prevent XSS
sanitized = html.escape(str_value)
return sanitized
def validate_symbol(symbol: str) -> str:
"""
Validate and sanitize cryptocurrency symbol
Args:
symbol: Cryptocurrency symbol (e.g., 'BTC', 'ETH')
Returns:
Uppercase, sanitized symbol
Raises:
ValueError: If symbol is invalid
"""
if not symbol:
raise ValueError("Symbol is required")
# Remove whitespace and convert to uppercase
symbol = symbol.strip().upper()
# Validate format: 2-10 alphanumeric characters
if not re.match(r'^[A-Z0-9]{2,10}$', symbol):
raise ValueError(f"Invalid symbol format: {symbol}")
return symbol
def validate_limit(limit: Any, default: int = 50, max_limit: int = 1000) -> int:
"""
Validate and sanitize limit parameter
Args:
limit: Limit value (can be int, str, or None)
default: Default value if limit is invalid
max_limit: Maximum allowed limit
Returns:
Validated integer limit
"""
if limit is None:
return default
try:
limit_int = int(limit)
except (ValueError, TypeError):
return default
# Ensure limit is within bounds
if limit_int < 1:
return default
if limit_int > max_limit:
return max_limit
return limit_int
def validate_timeframe(timeframe: str, default: str = "1D") -> str:
"""
Validate timeframe parameter
Args:
timeframe: Timeframe string (e.g., '1D', '7D', '1h', '4h')
default: Default timeframe if invalid
Returns:
Validated timeframe
"""
if not timeframe:
return default
timeframe = timeframe.strip().upper()
# Valid timeframes
valid_timeframes = ['1D', '7D', '30D', '1Y', '1H', '4H', '1W', '1M']
if timeframe in valid_timeframes:
return timeframe
return default
def sanitize_dict(data: Dict[str, Any], max_depth: int = 10) -> Dict[str, Any]:
"""
Recursively sanitize dictionary values
Args:
data: Dictionary to sanitize
max_depth: Maximum recursion depth
Returns:
Sanitized dictionary
"""
if max_depth <= 0:
return {}
sanitized = {}
for key, value in data.items():
# Sanitize key
safe_key = sanitize_string(str(key), max_length=100)
# Sanitize value based on type
if isinstance(value, str):
sanitized[safe_key] = sanitize_string(value)
elif isinstance(value, dict):
sanitized[safe_key] = sanitize_dict(value, max_depth - 1)
elif isinstance(value, list):
sanitized[safe_key] = sanitize_list(value, max_depth - 1)
elif isinstance(value, (int, float, bool)):
sanitized[safe_key] = value
else:
sanitized[safe_key] = sanitize_string(str(value))
return sanitized
def sanitize_list(data: List[Any], max_depth: int = 10) -> List[Any]:
"""
Recursively sanitize list values
Args:
data: List to sanitize
max_depth: Maximum recursion depth
Returns:
Sanitized list
"""
if max_depth <= 0:
return []
sanitized = []
for item in data:
if isinstance(item, str):
sanitized.append(sanitize_string(item))
elif isinstance(item, dict):
sanitized.append(sanitize_dict(item, max_depth - 1))
elif isinstance(item, list):
sanitized.append(sanitize_list(item, max_depth - 1))
elif isinstance(item, (int, float, bool)):
sanitized.append(item)
else:
sanitized.append(sanitize_string(str(item)))
return sanitized
def validate_ohlcv_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Validate OHLCV data structure
Args:
data: List of OHLCV records
Returns:
Validated and sanitized OHLCV data
Raises:
ValueError: If data structure is invalid
"""
if not isinstance(data, list):
raise ValueError("OHLCV data must be a list")
validated = []
required_fields = ['timestamp', 'open', 'high', 'low', 'close']
for record in data:
if not isinstance(record, dict):
continue
# Check required fields
missing_fields = [field for field in required_fields if field not in record]
if missing_fields:
continue # Skip invalid records
# Validate numeric fields
try:
validated_record = {
'timestamp': int(record['timestamp']),
'open': float(record['open']),
'high': float(record['high']),
'low': float(record['low']),
'close': float(record['close']),
'volume': float(record.get('volume', 0))
}
# Validate price logic: high >= low, high >= open, high >= close, low <= open, low <= close
if (validated_record['high'] >= validated_record['low'] and
validated_record['high'] >= validated_record['open'] and
validated_record['high'] >= validated_record['close'] and
validated_record['low'] <= validated_record['open'] and
validated_record['low'] <= validated_record['close']):
validated.append(validated_record)
except (ValueError, TypeError):
continue # Skip invalid numeric values
return validated
def validate_coin_data(coin: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate cryptocurrency coin data structure
Args:
coin: Coin data dictionary
Returns:
Validated coin data
Raises:
ValueError: If required fields are missing
"""
if not isinstance(coin, dict):
raise ValueError("Coin data must be a dictionary")
required_fields = ['symbol', 'name']
missing_fields = [field for field in required_fields if field not in coin]
if missing_fields:
raise ValueError(f"Missing required fields: {', '.join(missing_fields)}")
# Sanitize string fields
validated = {
'symbol': validate_symbol(coin['symbol']),
'name': sanitize_string(coin['name'], max_length=100),
'price': float(coin.get('price', coin.get('current_price', 0))),
'market_cap': float(coin.get('market_cap', 0)),
'volume_24h': float(coin.get('volume_24h', coin.get('total_volume', 0))),
'change_24h': float(coin.get('change_24h', coin.get('price_change_percentage_24h', 0)))
}
return validated