Testing-Agent-1 / initial_value_fixer.py
gauravlochab
fix: fit values to x axis
3e6c41e
# Sample CSV values:
# apr,adjusted_apr,timestamp,portfolio_snapshot,calculation_metrics,roi,agent_id,is_dummy,address,agent_name,metric_type,first_investment_timestamp,agent_hash,volume,trading_type,selected_protocols
# -0.03,1.75,2025-05-15 21:37:27.000000,"{'portfolio': {'portfolio_value': 29.34506065817397, 'allocations': [{'chain': 'optimism', 'type': 'velodrome', 'id': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'assets': ['FRAX', 'alUSD'], 'apr': 11.9, 'details': 'Velodrome Pool', 'ratio': 100.0, 'address': '0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758'}], 'portfolio_breakdown': [{'asset': 'FRAX', 'address': '0x2E3D870790dC77A83DD1d18184Acc7439A53f475', 'balance': 12.498312351563191, 'price': 0.999924, 'value_usd': 12.497362479824472, 'ratio': 0.425876}, {'asset': 'alUSD', 'address': '0xCB8FA9a76b8e203D8C3797bF438d8FB81Ea3326A', 'balance': 17.023792285753334, 'price': 0.989656, 'value_usd': 16.8476981783495, 'ratio': 0.574124}], 'address': '0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758'}, 'positons': [{'chain': 'optimism', 'pool_address': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'dex_type': 'velodrome', 'token0': '0x2E3D870790dC77A83DD1d18184Acc7439A53f475', 'token1': '0xCB8FA9a76b8e203D8C3797bF438d8FB81Ea3326A', 'token0_symbol': 'FRAX', 'token1_symbol': 'alUSD', 'apr': 11.901789131732096, 'pool_id': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'is_stable': True, 'is_cl_pool': False, 'amount0': 12549523370531409633, 'amount1': 16972223462662011900, 'timestamp': 1747319387, 'status': 'open', 'tx_hash': '0xb487bb4a45bcd7bb3b9e9e3fabe76bf6594828091598ffab69704754b4c8bea8'}]}","{'initial_value': 29.353178464538146, 'final_value': 29.34506065817397, 'f_i_ratio': -0.0002765562977782299, 'last_investment_timestamp': 1747319387, 'time_ratio': 5380.753851502806}",-0.0002765562977782299,86,False,0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758,nusus-tayar25,APR,,,,,
# Parse the optimus_apr_values.csv file
# Iterate on the rows: For each row:
# Parse address, final_value
# Compute initial_value using the parsed address similar to an Optimus function
# Compute the APR and ROI similar to an Optimus function
# Write the row with initial_value, APR, and ROI to a new CSV file
from datetime import datetime
from decimal import Decimal
import json
import logging
import os
import time
from typing import Dict, Optional, Tuple
from pandas import DataFrame
import requests
from web3 import Web3
ETHERSCAN_API_KEY = ""
EXCLUDED_ADDRESSES = { # Testnet agents of Gaurav, Divya, and Priyanshu
"0x6aA9b180c1a4Ef43Ea540Da905f23BAfAEEB7DC8", # agent_id 84
"0x6aA9b180c1a4Ef43Ea540Da905f23BAfAEEB7CB5", # agent_id 86
"0x3B3AbC1604fAd139F841Da5c3Cad73a72621fee4", # agent_id 102
}
COINGECKO_PRICE_API_URL = "https://api.coingecko.com/api/v3/coins/{coin_id}/history?date={date}}"
WHITELISTED_TOKENS = {
# Optimism tokens - stablecoins
"0x0b2c639c533813f4aa9d7837caf62653d097ff85": ("USDC", 6),
"0x01bff41798a0bcf287b996046ca68b395dbc1071": ("USDT0", 6),
"0x94b008aa00579c1307b0ef2c499ad98a8ce58e58": ("USDT", 6),
"0x7f5c764cbc14f9669b88837ca1490cca17c31607": ("USDC.e", 6),
"0x8ae125e8653821e851f12a49f7765db9a9ce7384": ("DOLA", 18),
"0xc40f949f8a4e094d1b49a23ea9241d289b7b2819": ("LUSD", 18),
"0xda10009cbd5d07dd0cecc66161fc93d7c9000da1": ("DAI", 18),
"0x087c440f251ff6cfe62b86dde1be558b95b4bb9b": ("BOLD", 18),
"0x2e3d870790dc77a83dd1d18184acc7439a53f475": ("FRAX", 18),
"0x2218a117083f5b482b0bb821d27056ba9c04b1d3": ("sDAI", 18),
"0x1217bfe6c773eec6cc4a38b5dc45b92292b6e189": ("oUSDT", 6),
"0x4f604735c1cf31399c6e711d5962b2b3e0225ad3": ("USDGLO", 18),
}
COIN_ID_MAPPING = {
"usdc": "usd-coin",
"alusd": "alchemix-usd",
"usdt0": "usdt0",
"usdt": "bridged-usdt",
"usdc.e": "bridged-usd-coin-optimism",
"usx": "token-dforce-usd",
"dola": "dola-usd",
"lusd": "liquity-usd",
"dai": "makerdao-optimism-bridged-dai-optimism",
"bold": "liquity-bold",
"frax": "frax",
"sdai": "savings-dai",
"usd+": "overnight-fi-usd-optimism",
"ousdt": "openusdt",
"usdglo": "glo-dollar",
}
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
w3 = Web3(Web3.HTTPProvider("https://rpc-gate.autonolas.tech/optimism-rpc/"))
def get_coin_id_from_symbol(symbol: str, chain: str) -> Optional[str]:
"""Map token symbol to CoinGecko ID."""
if chain == "optimism":
coin_id_map = {
"USDC": "usd-coin",
"ALUSD": "alchemix-usd",
"USDT0": "usdt0",
"USDT": "bridged-usdt",
"MSUSD": None,
"USDC.E": "bridged-usd-coin-optimism",
"USX": "token-dforce-usd",
"DOLA": "dola-usd",
"LUSD": "liquity-usd",
"DAI": "makerdao-optimism-bridged-dai-optimism",
"BOLD": "liquity-bold",
"FRAX": "frax",
"SDAI": "savings-dai",
"USD+": "overnight-fi-usd-optimism",
"OUSDT": "openusdt",
"USDGLO": "glo-dollar",
"ETH": "ethereum",
"WETH": "ethereum",
"WBTC": "wrapped-bitcoin",
}
return coin_id_map.get(symbol.upper())
return None
def load_cache(name: str) -> Dict:
"""Load price cache from JSON file."""
cache_file = f"{name}_cache.json"
if os.path.exists(cache_file):
try:
with open(cache_file, 'r') as f:
return json.load(f)
except json.JSONDecodeError:
logger.warning("Cache file corrupted, creating new cache")
return {}
return {}
def save_cache(name: str, cache: Dict):
"""Save price cache to JSON file."""
cache_file = f"{name}_cache.json"
with open(cache_file, 'w') as f:
json.dump(cache, f, indent=2)
def get_cached_price(date_str: str, token_symbol: str) -> Optional[float]:
"""Get price from cache if available."""
cache = load_cache(name="price")
return cache.get(date_str, {}).get(token_symbol)
def update_price_cache(date_str: str, token_symbol: str, price: float):
"""Update price cache with new value."""
cache = load_cache(name="price")
if date_str not in cache:
cache[date_str] = {}
cache[date_str][token_symbol] = price
save_cache(name="price", cache=cache)
def get_cached_request(cache_key: str) -> Optional[Dict]:
"""Get cached request response if available."""
cache = load_cache(name="request")
return cache.get(cache_key)
def update_request_cache(cache_key: str, response: Dict):
"""Update request cache with new response."""
cache = load_cache(name="request")
cache[cache_key] = response
save_cache(name="request", cache=cache)
def fetch_historical_eth_price(date_str: str) -> float:
"""Fetch historical ETH price from CoinGecko with caching."""
# Check cache first
cached_price = get_cached_price(date_str, "ETH")
if cached_price is not None:
return cached_price
try:
url = f"https://api.coingecko.com/api/v3/coins/ethereum/history"
params = {"date": date_str, "localization": "false"}
# Add delay to respect rate limits
time.sleep(1.2)
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
if "market_data" in data and "current_price" in data["market_data"]:
price = data["market_data"]["current_price"]["usd"]
# Update cache
update_price_cache(date_str, "ETH", price)
return price
return 0.0
except Exception as e:
print(f"Error fetching ETH price for {date_str}: {str(e)}")
return 0.0
def fetch_historical_token_price(coin_id: str, date_str: str, token_symbol: str) -> float:
"""Fetch historical token price from CoinGecko with caching."""
# Check cache first
cached_price = get_cached_price(date_str, token_symbol)
if cached_price is not None:
return cached_price
try:
success, data = request_with_retries(
endpoint=f"https://api.coingecko.com/api/v3/coins/{coin_id}/history",
params={"date": date_str, "localization": "false"},
)
if not success:
logger.error(f"Failed to fetch historical price for {coin_id} on {date_str}")
return 0.0
# Add delay to respect rate limits
time.sleep(1.2)
if "market_data" in data and "current_price" in data["market_data"]:
price = data["market_data"]["current_price"]["usd"]
# Update cache
update_price_cache(date_str, token_symbol, price)
return price
return 0.0
except Exception as e:
print(f"Error fetching price for {coin_id} on {date_str}: {str(e)}")
return 0.0
def get_block_at_timestamp(
timestamp: int,
chain: str = "optimism"
) -> Optional[int]:
success, res = request_with_retries(
endpoint=f"https://api-optimistic.etherscan.io/api?module=block&action=getblocknobytime&timestamp={timestamp}&closest=before&apikey={ETHERSCAN_API_KEY}",
)
if success and res.get("status") == "1" and "result" in res:
return int(res.get("result"))
else:
logger.error(f"Failed to fetch block at timestamp {timestamp} for {chain}: {res.get('message', 'Unknown error')}")
return None
def fetch_eth_balance(address: str, timestamp: float) -> float:
key = "eth_balance"
cache = load_cache(name=key)
if f"{address}_{timestamp}" in cache:
return cache[f"{address}_{timestamp}"] / (10 ** 18)
balance = w3.eth.get_balance(
account=Web3.to_checksum_address(address),
block_identifier=get_block_at_timestamp(int(timestamp))
)
cache[f"{address}_{timestamp}"] = balance
save_cache(name=key, cache=cache)
return balance / (10 ** 18)
def fetch_token_balance(
address: str,
token_address: str,
timestamp: int,
decimals: int = 18
) -> Optional[float]:
contract = w3.eth.contract(
address=Web3.to_checksum_address(token_address),
abi=[
{
"constant": True,
"inputs": [{"name": "_owner", "type": "address"}],
"name": "balanceOf",
"outputs": [{"name": "", "type": "uint256"}],
"payable": False,
"stateMutability": "view",
"type": "function",
}
]
)
try:
cache_key = f"token_balance_{address}_{token_address}_{timestamp}"
cache = load_cache(name="token_balance")
if cache_key in cache:
return cache[cache_key] / (10 ** decimals)
balance = contract.functions.balanceOf(address).call(block_identifier=get_block_at_timestamp(int(timestamp)))
cache[cache_key] = balance
save_cache(name="token_balance", cache=cache)
return balance / (10 ** decimals) if balance else 0.0
except Exception as e:
logger.error(f"Error fetching token balance for {address} at {timestamp}: {e}")
return None
def get_datetime_from_timestamp(timestamp: str) -> Optional[datetime]:
"""Convert timestamp string to datetime object."""
try:
return datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
except (ValueError, TypeError):
logger.warning(f"Invalid timestamp format: {timestamp}")
return None
def request_with_retries(
endpoint: str,
params: Dict = None,
headers: Dict = None,
method: str = "GET",
body: Dict = None,
rate_limited_code: int = 429,
retry_wait: int = 5,
max_retries: int = 3
) -> Tuple[bool, Dict]:
for attempt in range(max_retries):
try:
if method.upper() == "POST":
cache_key = f"POST_{endpoint}_{str(body or {})}"
cached_response = get_cached_request(cache_key)
if cached_response is not None:
return len(cached_response) > 0, cached_response
response = requests.post(endpoint, headers=headers, json=body)
if response.ok:
update_request_cache(cache_key, response.json())
else:
# Check cache first for GET requests
cache_key = f"{endpoint}_{str(params or {})}"
cached_response = get_cached_request(cache_key)
if cached_response is not None:
return len(cached_response) > 0, cached_response
response = requests.get(endpoint, headers=headers, params=params or {})
# Cache successful responses
if response.status_code == 200:
update_request_cache(cache_key, response.json())
elif response.status_code == 404:
update_request_cache(cache_key, {})
if response.status_code == rate_limited_code:
logger.warning(f"Rate limited. Waiting {retry_wait} seconds...")
time.sleep(retry_wait)
continue
if response.status_code != 200:
logger.error(f"Request failed with status {response.status_code}")
return False, {}
return True, response.json()
except Exception as e:
logger.error(f"Request failed: {str(e)}")
if attempt < max_retries - 1:
time.sleep(retry_wait)
continue
return False, {}
return False, {}
def should_include_transfer_optimism(
from_address: str
) -> bool:
"""Determine if an Optimism transfer should be included based on from address type."""
if not from_address:
return False
# Exclude zero address
if from_address.lower() in [
"0x0000000000000000000000000000000000000000",
"0x0",
"",
]:
return False
try:
# Use Optimism RPC to check if address is a contract
payload = {
"jsonrpc": "2.0",
"method": "eth_getCode",
"params": [from_address, "latest"],
"id": 1,
}
success, result = request_with_retries(
endpoint="https://mainnet.optimism.io",
method="POST",
headers={"Content-Type": "application/json"},
body=payload,
rate_limited_code=429,
retry_wait=5,
)
if not success:
logger.error("Failed to check contract code")
return False
code = result.get("result", "0x")
# If code is '0x', it's an EOA
if code == "0x":
return True
# If it has code, check if it's a GnosisSafe
safe_check_url = f"https://safe-transaction-optimism.safe.global/api/v1/safes/{from_address}/"
success, _ = request_with_retries(
endpoint=safe_check_url,
headers={"Accept": "application/json"},
rate_limited_code=429,
retry_wait=5,
)
if success:
return True
logger.info(
f"Excluding transfer from contract: {from_address}"
)
return False
except Exception as e:
logger.error(f"Error checking address {from_address}: {e}")
return False
def fetch_optimism_transfers(
address: str,
last_timestamp: int
) -> Dict:
"""
Fetch Optimism transfers for a given address with improved error handling and rate limiting.
"""
base_url = "https://safe-transaction-optimism.safe.global/api/v1"
all_transfers_by_date = {}
try:
logger.info(f"Fetching Optimism transfers for address {address}...")
last_date = datetime.fromtimestamp(last_timestamp).strftime("%Y-%m-%d")
# Fetch incoming transfers
transfers_url = f"{base_url}/safes/{address}/incoming-transfers/"
processed_count = 0
page_count = 0
max_pages = 10 # Limit to prevent infinite loops
while page_count < max_pages:
page_count += 1
logger.info(f"Fetching page {page_count} for address {address}")
success, response_json = request_with_retries(
endpoint=transfers_url,
headers={"Accept": "application/json"},
rate_limited_code=429,
retry_wait=5
)
if not success:
logger.error(f"Failed to fetch Optimism transfers for address {address} on page {page_count}")
break
transfers = response_json.get("results", [])
if not transfers:
logger.info(f"No more transfers found for address {address} on page {page_count}")
break
for transfer in transfers:
# Parse timestamp
timestamp = transfer.get("executionDate")
if not timestamp:
continue
tx_datetime = get_datetime_from_timestamp(timestamp)
tx_date = tx_datetime.strftime("%Y-%m-%d") if tx_datetime else None
if not tx_date:
continue
if tx_datetime.timestamp() > last_timestamp:
continue
# Process the transfer
from_address = transfer.get("from", address)
transfer_type = transfer.get("type", "")
if from_address.lower() == address.lower():
continue
# Initialize date in transfers dict if not exists
if tx_date not in all_transfers_by_date:
all_transfers_by_date[tx_date] = []
should_include = should_include_transfer_optimism(from_address)
if not should_include:
continue
# Process different transfer types
if transfer_type == "ERC20_TRANSFER":
# Token transfer
token_info = transfer.get("tokenInfo", {})
token_address = transfer.get("tokenAddress", "")
if not token_info:
if not token_address:
continue
# You might want to add token decimal and symbol fetching here
symbol = "Unknown"
decimals = 18
else:
symbol = token_info.get("symbol", "Unknown")
decimals = int(token_info.get("decimals", 18) or 18)
if symbol.lower() not in ["usdc", "eth"]:
continue
value_raw = int(transfer.get("value", "0") or "0")
amount = value_raw / (10**decimals)
transfer_data = {
"from_address": from_address,
"amount": amount,
"token_address": token_address,
"symbol": symbol,
"timestamp": timestamp,
"tx_hash": transfer.get("transactionHash", ""),
"type": "token"
}
elif transfer_type == "ETHER_TRANSFER":
# ETH transfer
try:
value_wei = int(transfer.get("value", "0") or "0")
amount_eth = value_wei / 10**18
if amount_eth <= 0:
continue
except (ValueError, TypeError):
logger.warning(f"Skipping transfer with invalid value: {transfer.get('value')}")
continue
transfer_data = {
"from_address": from_address,
"amount": amount_eth,
"token_address": "",
"symbol": "ETH",
"timestamp": timestamp,
"tx_hash": transfer.get("transactionHash", ""),
"type": "eth"
}
else:
# Skip other transfer types
continue
all_transfers_by_date[tx_date].append(transfer_data)
processed_count += 1
# Show progress
if processed_count % 50 == 0:
logger.info(f"Processed {processed_count} Optimism transfers for address {address}...")
# Check for next page
cursor = response_json.get("next")
if not cursor:
logger.info(f"No more pages for address {address}")
break
else:
transfers_url = cursor # Update URL for next page
logger.info(f"Completed Optimism transfers for address {address}: {processed_count} found")
return all_transfers_by_date
except Exception as e:
logger.error(f"Error fetching Optimism transfers for address {address}: {e}")
return {}
def calculate_initial_investment_value_from_funding_events(
transfers: Dict,
chain: str,
) -> float:
total_investment = 0.0
if not transfers:
print(f"No transfers found for {chain} chain")
return 0.0
if chain == "optimism":
print("Using Optimism-specific transfer processing")
for date, date_transfers in transfers.items():
for transfer in date_transfers:
try:
amount = transfer.get("amount", 0)
token_symbol = transfer.get("symbol", "").upper()
if amount <= 0:
continue
# Get historical price for the transfer date
date_str = datetime.strptime(date, "%Y-%m-%d").strftime("%d-%m-%Y")
if token_symbol == "ETH": # nosec B105
price = fetch_historical_eth_price(date_str)
else:
coingecko_id = get_coin_id_from_symbol(token_symbol, chain)
if coingecko_id:
price = fetch_historical_token_price(
coingecko_id, date_str, token_symbol
)
else:
price = None
transfer_value = amount * price
total_investment += transfer_value
print(f"Processed transfer on {date}: {amount} {token_symbol} @ ${price} = ${transfer_value}")
except Exception as e:
print(f"Error processing transfer: {str(e)}")
continue
else:
print(f"Unsupported chain: {chain}, skipping")
return 0.0
print(f"Total initial investment from {chain} chain: ${total_investment}")
return total_investment if total_investment > 0 else 0.0
def calculate_initial_value_from_address_and_timestamp(
address: str,
final_timestamp: int,
) -> Tuple[float, int]:
# First fetch the transfers
transfers = fetch_optimism_transfers(address, final_timestamp)
initial_timestamp = final_timestamp
for _transfers in transfers.values():
for _transfer in _transfers:
if "timestamp" not in _transfer:
continue
transfer_timestamp = datetime.fromisoformat(_transfer["timestamp"].replace('Z', '+00:00')).timestamp()
if transfer_timestamp < initial_timestamp:
initial_timestamp = int(transfer_timestamp)
# Then calculate initial investment
initial_investment = calculate_initial_investment_value_from_funding_events(
transfers=transfers,
chain="optimism",
)
return initial_investment, int(initial_timestamp)
def calculate_final_value_from_address_and_timestamp(
address: str,
timestamp: int,
) -> float:
eth_balance = fetch_eth_balance(address, timestamp)
eth_price = fetch_historical_eth_price(
datetime.utcfromtimestamp(timestamp).strftime("%d-%m-%Y")
)
final_value = eth_balance * eth_price
for token_address, (symbol, decimals) in WHITELISTED_TOKENS.items():
token_balance = fetch_token_balance(
address=address,
token_address=token_address,
decimals=decimals,
timestamp=timestamp,
)
token_price = fetch_historical_token_price(
coin_id=COIN_ID_MAPPING.get(symbol.lower(), symbol.lower()),
date_str=datetime.utcfromtimestamp(timestamp).strftime("%d-%m-%Y"),
token_symbol=symbol
)
if token_balance is not None and token_price is not None:
token_value = token_balance * token_price
if token_value > 0:
final_value += token_value
return final_value
def _calculate_adjusted_apr(
apr: float,
initial_timestamp: int,
final_timestamp: int
) -> float:
if apr is None or apr == 0:
return 0.0
intial_eth_price = fetch_historical_eth_price(datetime.utcfromtimestamp(initial_timestamp).strftime("%d-%m-%Y"))
final_eth_price = fetch_historical_eth_price(datetime.utcfromtimestamp(final_timestamp).strftime("%d-%m-%Y"))
if (
final_eth_price is not None
and intial_eth_price is not None
):
adjustment_factor = Decimal("1") - (
Decimal(str(final_eth_price)) / Decimal(str(intial_eth_price))
)
adjusted_apr = round(
float(apr)
+ float(adjustment_factor * Decimal("100")),
2,
)
return adjusted_apr
else:
logger.warning(
f"Could not fetch ETH prices for timestamps {initial_timestamp} and {final_timestamp}. Returning original APR: {apr}"
)
return apr
def calculate_apr_and_roi(
initial_value: float,
final_value: float,
initial_timestamp: int,
final_timestamp: int
) -> Tuple[float, float, float]:
if final_value <= 0:
logger.warning("Final value is non-positive, returning 0.0 for APR and ROI.")
return 0.0, 0.0, 0.0
# Calculate ROI (Return on Investment)
roi = ((final_value / initial_value) - 1) * 100
# Calculate hours since investment
hours = max(1, (final_timestamp - int(initial_timestamp)) / 3600)
# Calculate time ratio (hours in a year / hours since investment)
hours_in_year = 8760
time_ratio = hours_in_year / hours
# Calculate APR (Annualized ROI)
apr = float(roi * time_ratio)
if apr < 0:
apr = roi
adjust_apr = _calculate_adjusted_apr(
apr=apr,
initial_timestamp=initial_timestamp,
final_timestamp=final_timestamp
)
return float(round(apr, 2)), float(round(adjust_apr, 2)), float(round(roi, 2))
def fix_apr_and_roi(df: DataFrame) -> DataFrame:
"""
Fix APR and ROI values by recalculating them based on actual blockchain data.
This function processes each row only once and includes proper error handling.
"""
if df.empty:
logger.info("Empty DataFrame provided to fix_apr_and_roi, returning as-is")
return df
logger.info(f"Starting fix_apr_and_roi with {len(df)} rows")
# Remove rows with excluded addresses
original_count = len(df)
df = df[~df['address'].isin(EXCLUDED_ADDRESSES)]
excluded_count = original_count - len(df)
if excluded_count > 0:
logger.info(f"Excluded {excluded_count} rows with excluded addresses")
# Remove rows with timestamps before 2025-06-06
original_count = len(df)
df = df[df['timestamp'] >= '2025-06-06 00:00:00.000000']
old_data_count = original_count - len(df)
if old_data_count > 0:
logger.info(f"Excluded {old_data_count} rows with timestamps before 2025-06-06")
# Check for future timestamps and filter them out
current_time = datetime.now().timestamp()
future_rows = df[df['timestamp'].apply(lambda x: x.timestamp()) > current_time]
if not future_rows.empty:
logger.warning(f"Found {len(future_rows)} rows with future timestamps, excluding them")
for idx, row in future_rows.iterrows():
logger.warning(f"Future timestamp found: {row['timestamp']} (timestamp: {row['timestamp'].timestamp()})")
df = df[df['timestamp'].apply(lambda x: x.timestamp()) <= current_time]
if df.empty:
logger.warning("No valid rows remaining after filtering, returning empty DataFrame")
return df
logger.info(f"Processing {len(df)} valid rows")
# Create a copy to avoid modifying the original DataFrame during iteration
df_copy = df.copy()
rows_to_drop = []
processed_count = 0
for idx, row in df_copy.iterrows():
try:
if row['is_dummy']:
logger.debug(f"Skipping dummy row {idx}")
continue
processed_count += 1
logger.info(f"Processing row {processed_count}/{len(df_copy)} - Address: {row['address']}")
final_timestamp = int(row['timestamp'].timestamp())
# Validate timestamp is not in the future
if final_timestamp > current_time:
logger.warning(f"Skipping row {idx} with future timestamp: {final_timestamp}")
rows_to_drop.append(idx)
continue
calculation_metrics = row['calculation_metrics']
# Calculate initial value and timestamp with error handling
try:
initial_value, initial_timestamp = calculate_initial_value_from_address_and_timestamp(
row['address'], final_timestamp
)
except Exception as e:
logger.error(f"Error calculating initial value for address {row['address']}: {e}")
rows_to_drop.append(idx)
continue
# Calculate final value with error handling
try:
final_value = calculate_final_value_from_address_and_timestamp(
row['address'], final_timestamp
)
if row["volume"] > 0:
final_value += row["volume"]
except Exception as e:
logger.error(f"Error calculating final value for address {row['address']}: {e}")
rows_to_drop.append(idx)
continue
if initial_value <= 0:
logger.warning(f"Initial value for address {row['address']} is non-positive ({initial_value}), skipping row.")
rows_to_drop.append(idx)
continue
# Update calculation metrics
calculation_metrics['initial_value'] = initial_value
calculation_metrics['final_value'] = final_value
df.at[idx, 'calculation_metrics'] = calculation_metrics
# Calculate APR and ROI with error handling
try:
apr, adjusted_apr, roi = calculate_apr_and_roi(
initial_value=initial_value,
final_value=final_value,
initial_timestamp=initial_timestamp,
final_timestamp=final_timestamp
)
df.at[idx, 'apr'] = apr
df.at[idx, 'adjusted_apr'] = adjusted_apr
df.at[idx, 'roi'] = roi
logger.info(f"Successfully processed address {row['address']}: APR={apr:.2f}, ROI={roi:.2f}")
except Exception as e:
logger.error(f"Error calculating APR/ROI for address {row['address']}: {e}")
rows_to_drop.append(idx)
continue
except Exception as e:
logger.error(f"Unexpected error processing row {idx}: {e}")
rows_to_drop.append(idx)
continue
# Drop rows that had errors
if rows_to_drop:
logger.info(f"Dropping {len(rows_to_drop)} rows due to errors")
df = df.drop(rows_to_drop)
logger.info(f"Completed fix_apr_and_roi: {len(df)} rows remaining")
return df
if __name__ == "__main__":
test_address = "0xa11417aeBF3932ee895008eDE8eA95616f488bCf"
test_final_timestamp = 1749567654
v = calculate_initial_value_from_address_and_timestamp(
test_address,
test_final_timestamp
)
print(v)