# Sample CSV values: # apr,adjusted_apr,timestamp,portfolio_snapshot,calculation_metrics,roi,agent_id,is_dummy,address,agent_name,metric_type,first_investment_timestamp,agent_hash,volume,trading_type,selected_protocols # -0.03,1.75,2025-05-15 21:37:27.000000,"{'portfolio': {'portfolio_value': 29.34506065817397, 'allocations': [{'chain': 'optimism', 'type': 'velodrome', 'id': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'assets': ['FRAX', 'alUSD'], 'apr': 11.9, 'details': 'Velodrome Pool', 'ratio': 100.0, 'address': '0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758'}], 'portfolio_breakdown': [{'asset': 'FRAX', 'address': '0x2E3D870790dC77A83DD1d18184Acc7439A53f475', 'balance': 12.498312351563191, 'price': 0.999924, 'value_usd': 12.497362479824472, 'ratio': 0.425876}, {'asset': 'alUSD', 'address': '0xCB8FA9a76b8e203D8C3797bF438d8FB81Ea3326A', 'balance': 17.023792285753334, 'price': 0.989656, 'value_usd': 16.8476981783495, 'ratio': 0.574124}], 'address': '0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758'}, 'positons': [{'chain': 'optimism', 'pool_address': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'dex_type': 'velodrome', 'token0': '0x2E3D870790dC77A83DD1d18184Acc7439A53f475', 'token1': '0xCB8FA9a76b8e203D8C3797bF438d8FB81Ea3326A', 'token0_symbol': 'FRAX', 'token1_symbol': 'alUSD', 'apr': 11.901789131732096, 'pool_id': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'is_stable': True, 'is_cl_pool': False, 'amount0': 12549523370531409633, 'amount1': 16972223462662011900, 'timestamp': 1747319387, 'status': 'open', 'tx_hash': '0xb487bb4a45bcd7bb3b9e9e3fabe76bf6594828091598ffab69704754b4c8bea8'}]}","{'initial_value': 29.353178464538146, 'final_value': 29.34506065817397, 'f_i_ratio': -0.0002765562977782299, 'last_investment_timestamp': 1747319387, 'time_ratio': 5380.753851502806}",-0.0002765562977782299,86,False,0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758,nusus-tayar25,APR,,,,, # Parse the optimus_apr_values.csv file # Iterate on the rows: For each row: # Parse address, final_value # Compute initial_value using the parsed address similar to an Optimus function # Compute the APR and ROI similar to an Optimus function # Write the row with initial_value, APR, and ROI to a new CSV file from datetime import datetime from decimal import Decimal import json import logging import os import time from typing import Dict, Optional, Tuple, List from pandas import DataFrame import requests from web3 import Web3 ETHERSCAN_API_KEY = "" EXCLUDED_ADDRESSES = { # Testnet agents of Gaurav, Divya, and Priyanshu "0x6aA9b180c1a4Ef43Ea540Da905f23BAfAEEB7DC8", # agent_id 84 "0x6aA9b180c1a4Ef43Ea540Da905f23BAfAEEB7CB5", # agent_id 86 "0x3B3AbC1604fAd139F841Da5c3Cad73a72621fee4", # agent_id 102 } COINGECKO_PRICE_API_URL = "https://api.coingecko.com/api/v3/coins/{coin_id}/history?date={date}}" WHITELISTED_TOKENS = { # Optimism tokens - stablecoins "0x0b2c639c533813f4aa9d7837caf62653d097ff85": ("USDC", 6), "0x01bff41798a0bcf287b996046ca68b395dbc1071": ("USDT0", 6), "0x94b008aa00579c1307b0ef2c499ad98a8ce58e58": ("USDT", 6), "0x7f5c764cbc14f9669b88837ca1490cca17c31607": ("USDC.e", 6), "0x8ae125e8653821e851f12a49f7765db9a9ce7384": ("DOLA", 18), "0xc40f949f8a4e094d1b49a23ea9241d289b7b2819": ("LUSD", 18), "0xda10009cbd5d07dd0cecc66161fc93d7c9000da1": ("DAI", 18), "0x087c440f251ff6cfe62b86dde1be558b95b4bb9b": ("BOLD", 18), "0x2e3d870790dc77a83dd1d18184acc7439a53f475": ("FRAX", 18), "0x2218a117083f5b482b0bb821d27056ba9c04b1d3": ("sDAI", 18), "0x1217bfe6c773eec6cc4a38b5dc45b92292b6e189": ("oUSDT", 6), "0x4f604735c1cf31399c6e711d5962b2b3e0225ad3": ("USDGLO", 18), "0xFC2E6e6BCbd49ccf3A5f029c79984372DcBFE527": ("OLAS", 18) } COIN_ID_MAPPING = { "usdc": "usd-coin", "alusd": "alchemix-usd", "usdt0": "usdt0", "usdt": "bridged-usdt", "usdc.e": "bridged-usd-coin-optimism", "usx": "token-dforce-usd", "dola": "dola-usd", "lusd": "liquity-usd", "dai": "makerdao-optimism-bridged-dai-optimism", "bold": "liquity-bold", "frax": "frax", "sdai": "savings-dai", "usd+": "overnight-fi-usd-optimism", "ousdt": "openusdt", "usdglo": "glo-dollar", "olas": "autonolas" } ZERO_ADDRESS = "0x0000000000000000000000000000000000000000" # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) w3 = Web3(Web3.HTTPProvider("https://rpc-gate.autonolas.tech/optimism-rpc/")) def get_coin_id_from_symbol(symbol: str, chain: str) -> Optional[str]: """Map token symbol to CoinGecko ID.""" if chain == "optimism": coin_id_map = { "USDC": "usd-coin", "ALUSD": "alchemix-usd", "USDT0": "usdt0", "USDT": "bridged-usdt", "MSUSD": None, "USDC.E": "bridged-usd-coin-optimism", "USX": "token-dforce-usd", "DOLA": "dola-usd", "LUSD": "liquity-usd", "DAI": "makerdao-optimism-bridged-dai-optimism", "BOLD": "liquity-bold", "FRAX": "frax", "SDAI": "savings-dai", "USD+": "overnight-fi-usd-optimism", "OUSDT": "openusdt", "USDGLO": "glo-dollar", "ETH": "ethereum", "WETH": "ethereum", "WBTC": "wrapped-bitcoin", } return coin_id_map.get(symbol.upper()) return None def load_cache(name: str) -> Dict: """Load price cache from JSON file.""" cache_file = f"{name}_cache.json" if os.path.exists(cache_file): try: with open(cache_file, 'r') as f: return json.load(f) except json.JSONDecodeError: logger.warning("Cache file corrupted, creating new cache") return {} return {} def save_cache(name: str, cache: Dict): """Save price cache to JSON file.""" cache_file = f"{name}_cache.json" with open(cache_file, 'w') as f: json.dump(cache, f, indent=2) def get_cached_price(date_str: str, token_symbol: str) -> Optional[float]: """Get price from cache if available.""" cache = load_cache(name="price") return cache.get(date_str, {}).get(token_symbol) def update_price_cache(date_str: str, token_symbol: str, price: float): """Update price cache with new value.""" cache = load_cache(name="price") if date_str not in cache: cache[date_str] = {} cache[date_str][token_symbol] = price save_cache(name="price", cache=cache) def get_cached_request(cache_key: str) -> Optional[Dict]: """Get cached request response if available.""" cache = load_cache(name="request") return cache.get(cache_key) def update_request_cache(cache_key: str, response: Dict): """Update request cache with new response.""" cache = load_cache(name="request") cache[cache_key] = response save_cache(name="request", cache=cache) def fetch_historical_eth_price(date_str: str) -> float: """Fetch historical ETH price from CoinGecko with caching.""" # Check cache first cached_price = get_cached_price(date_str, "ETH") if cached_price is not None: return cached_price try: url = "https://api.coingecko.com/api/v3/coins/ethereum/history" params = {"date": date_str, "localization": "false"} # Add delay to respect rate limits time.sleep(1.2) response = requests.get(url, params=params) response.raise_for_status() data = response.json() if "market_data" in data and "current_price" in data["market_data"]: price = data["market_data"]["current_price"]["usd"] # Update cache update_price_cache(date_str, "ETH", price) return price return 0.0 except Exception as e: print(f"Error fetching ETH price for {date_str}: {str(e)}") return 0.0 def fetch_historical_token_price(coin_id: str, date_str: str, token_symbol: str) -> float: """Fetch historical token price from CoinGecko with caching.""" # Check cache first cached_price = get_cached_price(date_str, token_symbol) if cached_price is not None: return cached_price try: success, data = request_with_retries( endpoint=f"https://api.coingecko.com/api/v3/coins/{coin_id}/history", params={"date": date_str, "localization": "false"}, ) if not success: logger.error(f"Failed to fetch historical price for {coin_id} on {date_str}") return 0.0 # Add delay to respect rate limits time.sleep(1.2) if "market_data" in data and "current_price" in data["market_data"]: price = data["market_data"]["current_price"]["usd"] # Update cache update_price_cache(date_str, token_symbol, price) return price return 0.0 except Exception as e: print(f"Error fetching price for {coin_id} on {date_str}: {str(e)}") return 0.0 def get_block_at_timestamp( timestamp: int, chain: str = "optimism" ) -> Optional[int]: success, res = request_with_retries( endpoint=f"https://api-optimistic.etherscan.io/api?module=block&action=getblocknobytime×tamp={timestamp}&closest=before&apikey={ETHERSCAN_API_KEY}", ) if success and res.get("status") == "1" and "result" in res: return int(res.get("result")) else: logger.error(f"Failed to fetch block at timestamp {timestamp} for {chain}: {res.get('message', 'Unknown error')}") return None def fetch_eth_balance(address: str, timestamp: float) -> float: key = "eth_balance" cache = load_cache(name=key) if f"{address}_{timestamp}" in cache: return cache[f"{address}_{timestamp}"] / (10 ** 18) balance = w3.eth.get_balance( account=Web3.to_checksum_address(address), block_identifier=get_block_at_timestamp(int(timestamp)) ) cache[f"{address}_{timestamp}"] = balance save_cache(name=key, cache=cache) return balance / (10 ** 18) def fetch_token_balance( address: str, token_address: str, timestamp: int, decimals: int = 18 ) -> Optional[float]: contract = w3.eth.contract( address=Web3.to_checksum_address(token_address), abi=[ { "constant": True, "inputs": [{"name": "_owner", "type": "address"}], "name": "balanceOf", "outputs": [{"name": "", "type": "uint256"}], "payable": False, "stateMutability": "view", "type": "function", } ] ) try: cache_key = f"token_balance_{address}_{token_address}_{timestamp}" cache = load_cache(name="token_balance") if cache_key in cache: return cache[cache_key] / (10 ** decimals) balance = contract.functions.balanceOf(address).call(block_identifier=get_block_at_timestamp(int(timestamp))) cache[cache_key] = balance save_cache(name="token_balance", cache=cache) return balance / (10 ** decimals) if balance else 0.0 except Exception as e: logger.error(f"Error fetching token balance for {address} at {timestamp}: {e}") return None def get_datetime_from_timestamp(timestamp: str) -> Optional[datetime]: """Convert timestamp string to datetime object.""" try: return datetime.fromisoformat(timestamp.replace("Z", "+00:00")) except (ValueError, TypeError): logger.warning(f"Invalid timestamp format: {timestamp}") return None def request_with_retries( endpoint: str, params: Dict = None, headers: Dict = None, method: str = "GET", body: Dict = None, rate_limited_code: int = 429, retry_wait: int = 5, max_retries: int = 3 ) -> Tuple[bool, Dict]: for attempt in range(max_retries): try: if method.upper() == "POST": cache_key = f"POST_{endpoint}_{str(body or {})}" cached_response = get_cached_request(cache_key) if cached_response is not None: return len(cached_response) > 0, cached_response response = requests.post(endpoint, headers=headers, json=body) if response.ok: update_request_cache(cache_key, response.json()) else: # Check cache first for GET requests cache_key = f"{endpoint}_{str(params or {})}" cached_response = get_cached_request(cache_key) if cached_response is not None: return len(cached_response) > 0, cached_response response = requests.get(endpoint, headers=headers, params=params or {}) # Cache successful responses if response.status_code == 200: update_request_cache(cache_key, response.json()) elif response.status_code == 404: update_request_cache(cache_key, {}) if response.status_code == rate_limited_code: logger.warning(f"Rate limited. Waiting {retry_wait} seconds...") time.sleep(retry_wait) continue if response.status_code != 200: logger.error(f"Request failed with status {response.status_code}") return False, {} return True, response.json() except Exception as e: logger.error(f"Request failed: {str(e)}") if attempt < max_retries - 1: time.sleep(retry_wait) continue return False, {} return False, {} def should_include_transfer_optimism( from_address: str ) -> bool: """Determine if an Optimism transfer should be included based on from address type.""" if not from_address: return False # Exclude zero address if from_address.lower() in [ "0x0000000000000000000000000000000000000000", "0x0", "", ]: return False try: # Use Optimism RPC to check if address is a contract payload = { "jsonrpc": "2.0", "method": "eth_getCode", "params": [from_address, "latest"], "id": 1, } success, result = request_with_retries( endpoint="https://mainnet.optimism.io", method="POST", headers={"Content-Type": "application/json"}, body=payload, rate_limited_code=429, retry_wait=5, ) if not success: logger.error("Failed to check contract code") return False code = result.get("result", "0x") # If code is '0x', it's an EOA if code == "0x": return True # If it has code, check if it's a GnosisSafe safe_check_url = f"https://safe-transaction-optimism.safe.global/api/v1/safes/{from_address}/" success, _ = request_with_retries( endpoint=safe_check_url, headers={"Accept": "application/json"}, rate_limited_code=429, retry_wait=5, ) if success: return True logger.info( f"Excluding transfer from contract: {from_address}" ) return False except Exception as e: logger.error(f"Error checking address {from_address}: {e}") return False def fetch_optimism_incoming_transfers( address: str, last_timestamp: int ) -> Dict: """ Fetch Optimism transfers for a given address with improved error handling and rate limiting. """ base_url = "https://safe-transaction-optimism.safe.global/api/v1" all_transfers_by_date = {} try: logger.info(f"Fetching Optimism transfers for address {address}...") # Fetch incoming transfers transfers_url = f"{base_url}/safes/{address}/incoming-transfers/" processed_count = 0 page_count = 0 max_pages = 10 # Limit to prevent infinite loops while page_count < max_pages: page_count += 1 logger.info(f"Fetching page {page_count} for address {address}") success, response_json = request_with_retries( endpoint=transfers_url, headers={"Accept": "application/json"}, rate_limited_code=429, retry_wait=5 ) if not success: logger.error(f"Failed to fetch Optimism transfers for address {address} on page {page_count}") break transfers = response_json.get("results", []) if not transfers: logger.info(f"No more transfers found for address {address} on page {page_count}") break print("incoming transfers",response_json) for transfer in transfers: # Parse timestamp timestamp = transfer.get("executionDate") if not timestamp: continue tx_datetime = get_datetime_from_timestamp(timestamp) tx_date = tx_datetime.strftime("%Y-%m-%d") if tx_datetime else None if not tx_date: continue if tx_datetime.timestamp() > last_timestamp: continue # Process the transfer from_address = transfer.get("from", address) transfer_type = transfer.get("type", "") if from_address.lower() == address.lower(): continue # Initialize date in transfers dict if not exists if tx_date not in all_transfers_by_date: all_transfers_by_date[tx_date] = [] should_include = should_include_transfer_optimism(from_address) if not should_include: continue # Process different transfer types if transfer_type == "ERC20_TRANSFER": # Token transfer token_info = transfer.get("tokenInfo", {}) token_address = transfer.get("tokenAddress", "") if not token_info: if not token_address: continue # You might want to add token decimal and symbol fetching here symbol = "Unknown" decimals = 18 else: symbol = token_info.get("symbol", "Unknown") decimals = int(token_info.get("decimals", 18) or 18) if symbol.lower() not in ["usdc", "eth"]: continue value_raw = int(transfer.get("value", "0") or "0") amount = value_raw / (10**decimals) transfer_data = { "from_address": from_address, "amount": amount, "token_address": token_address, "symbol": symbol, "timestamp": timestamp, "tx_hash": transfer.get("transactionHash", ""), "type": "token" } elif transfer_type == "ETHER_TRANSFER": # ETH transfer try: value_wei = int(transfer.get("value", "0") or "0") amount_eth = value_wei / 10**18 if amount_eth <= 0: continue except (ValueError, TypeError): logger.warning(f"Skipping transfer with invalid value: {transfer.get('value')}") continue transfer_data = { "from_address": from_address, "amount": amount_eth, "token_address": "", "symbol": "ETH", "timestamp": timestamp, "tx_hash": transfer.get("transactionHash", ""), "type": "eth" } else: # Skip other transfer types continue all_transfers_by_date[tx_date].append(transfer_data) processed_count += 1 # Show progress if processed_count % 50 == 0: logger.info(f"Processed {processed_count} Optimism transfers for address {address}...") # Check for next page cursor = response_json.get("next") if not cursor: logger.info(f"No more pages for address {address}") break else: transfers_url = cursor # Update URL for next page logger.info(f"Completed Optimism transfers for address {address}: {processed_count} found") return all_transfers_by_date except Exception as e: logger.error(f"Error fetching Optimism transfers for address {address}: {e}") return {} def fetch_optimism_outgoing_transfers( address: str, final_timestamp: int, from_address: str, ) -> Dict: """Fetch all outgoing transfers from the safe address on Optimism until a specific date. Args: address: The safe address to fetch transfers for final_timestamp: The timestamp until which to fetch transfers from_address: The master address to check for reversions Returns: Dict: Dictionary of transfers organized by date """ all_transfers = {} if not address: logger.warning( "No address provided for fetching Optimism outgoing transfers" ) return all_transfers try: # Use SafeGlobal API for Optimism transfers base_url = "https://safe-transaction-optimism.safe.global/api/v1" transfers_url = f"{base_url}/safes/{address}/transfers/" processed_count = 0 page_count = 0 max_pages = 50 # Increased limit to handle more transactions while page_count < max_pages: page_count += 1 logger.info(f"Fetching outgoing transfers page {page_count} for address {address}") success, response_json = request_with_retries( endpoint=transfers_url, headers={"Accept": "application/json"}, rate_limited_code=429, retry_wait=5, ) if not success: logger.error(f"Failed to fetch Optimism transfers for address {address} on page {page_count}") break transfers = response_json.get("results", []) if not transfers: logger.info(f"No more transfers found for address {address} on page {page_count}") break print("outgoing_transfers", response_json) for transfer in transfers: # Parse timestamp timestamp = transfer.get("executionDate") if not timestamp: continue # Handle ISO format timestamp try: tx_datetime = datetime.fromisoformat( timestamp.replace("Z", "+00:00") ) tx_date = tx_datetime.strftime("%Y-%m-%d") except (ValueError, TypeError): logger.warning( f"Invalid timestamp format: {timestamp}" ) continue if tx_datetime.timestamp() > final_timestamp: continue # Only process outgoing transfers (where from address is equal to our safe address) # OR transfers going back to the master address (reversions) if transfer.get("from").lower() == address.lower(): transfer_type = transfer.get("type", "") if transfer_type == "ETHER_TRANSFER": try: value_wei = int(transfer.get("value", "0") or "0") amount_eth = value_wei / 10**18 if amount_eth <= 0: continue except (ValueError, TypeError): continue transfer_data = { "from_address": address, "to_address": transfer.get("to"), "amount": amount_eth, "token_address": ZERO_ADDRESS, "symbol": "ETH", "timestamp": timestamp, "tx_hash": transfer.get("transactionHash", ""), "type": "eth", } if tx_date not in all_transfers: all_transfers[tx_date] = [] all_transfers[tx_date].append(transfer_data) processed_count += 1 # Also process ERC20 transfers for completeness token_info = transfer.get("tokenInfo", {}) token_address = transfer.get("tokenAddress", "") if token_info: symbol = token_info.get("symbol", "Unknown") decimals = int(token_info.get("decimals", 18) or 18) else: symbol = "Unknown" decimals = 18 try: value_raw = int(transfer.get("value", "0") or "0") amount = value_raw / (10**decimals) if amount <= 0: continue except (ValueError, TypeError): continue transfer_data = { "from_address": transfer.get("from"), "to_address": transfer.get("to"), "amount": amount, "token_address": token_address, "symbol": symbol, "timestamp": timestamp, "tx_hash": transfer.get("transactionHash", ""), "type": "token", } if tx_date not in all_transfers: all_transfers[tx_date] = [] all_transfers[tx_date].append(transfer_data) processed_count += 1 # Show progress if processed_count % 50 == 0: logger.info(f"Processed {processed_count} outgoing transfers for address {address}...") # Check for next page cursor = response_json.get("next") if not cursor: logger.info(f"No more pages for address {address}") break else: transfers_url = cursor # Update URL for next page logger.info(f"Completed Optimism outgoing transfers: {processed_count} found") return all_transfers except Exception as e: logger.error(f"Error fetching Optimism outgoing transfers: {e}") return {} def track_and_calculate_reversion_value( safe_address: str, chain: str, incoming_transfers: Dict, outgoing_transfers: Dict, ) -> float: """Track ETH transfers to safe address and handle reversion logic.""" try: if not incoming_transfers: logger.warning(f"No transfers found for {chain} chain") return 0.0 # Track ETH transfers eth_transfers = [] initial_funding = None master_safe_address = None reversion_transfers = [] reversion_value = 0.0 # Sort transfers by timestamp sorted_incoming_transfers = [] for _, transfers in incoming_transfers.items(): for transfer in transfers: if isinstance(transfer, dict) and "timestamp" in transfer: sorted_incoming_transfers.append(transfer) sorted_incoming_transfers.sort(key=lambda x: x["timestamp"]) sorted_outgoing_transfers = [] for _, transfers in outgoing_transfers.items(): for transfer in transfers: if isinstance(transfer, dict) and "timestamp" in transfer: sorted_outgoing_transfers.append(transfer) sorted_outgoing_transfers.sort(key=lambda x: x["timestamp"]) # Process transfers for transfer in sorted_incoming_transfers: # Check if it's an ETH transfer if transfer.get("symbol") == "ETH": # If this is the first transfer, store it as initial funding if not initial_funding: initial_funding = { "amount": transfer.get("amount", 0), "from_address": transfer.get("from_address"), "timestamp": transfer.get("timestamp"), } if transfer.get("from_address"): master_safe_address = transfer.get("from_address").lower() eth_transfers.append(transfer) # If it's from the same address as initial funding elif ( transfer.get("from_address", "").lower() == master_safe_address ): eth_transfers.append(transfer) for transfer in sorted_outgoing_transfers: if transfer.get("symbol") == "ETH": if ( transfer.get("to_address", "").lower() == master_safe_address and transfer.get("from_address", "").lower() == safe_address.lower() ): reversion_transfers.append(transfer) reversion_value = calculate_total_reversion_value( eth_transfers, reversion_transfers ) return reversion_value except Exception as e: logger.error(f"Error tracking ETH transfers: {str(e)}") return 0.0 def calculate_total_reversion_value( eth_transfers: List[Dict], reversion_transfers: List[Dict] ) -> float: """Calculate the total reversion value from the reversion transfers.""" reversion_amount = 0.0 reversion_date = None reversion_value = 0.0 last_transfer = eth_transfers[-1] try: # Handle ISO format timestamp timestamp = last_transfer.get("timestamp", "") if timestamp.endswith("Z"): # Convert ISO format to datetime tx_datetime = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) reversion_date = tx_datetime.strftime("%d-%m-%Y") else: # Try parsing as Unix timestamp reversion_date = datetime.fromtimestamp(int(timestamp)).strftime( "%d-%m-%Y" ) except (ValueError, TypeError) as e: logger.warning(f"Error parsing timestamp: {e}") # Use current date as fallback transfer = reversion_transfers[0] reversion_date = datetime.fromisoformat(transfer.get("timestamp", "").replace('Z', '+00:00')).strftime("%d-%m-%Y") for index, transfer in enumerate(reversion_transfers): transfer_date = datetime.fromisoformat(transfer.get("timestamp", "").replace('Z', '+00:00')).strftime("%d-%m-%Y") if index == 0: eth_price = fetch_historical_eth_price(reversion_date) else: eth_price = fetch_historical_eth_price(transfer_date) if eth_price: reversion_amount = transfer.get("amount", 0) reversion_value += reversion_amount * eth_price return reversion_value def calculate_initial_investment_value_from_funding_events( chain: str, address: str, incoming_transfers: Dict, outgoing_transfers: Dict, ) -> float: total_investment = 0.0 if not incoming_transfers: print(f"No transfers found for {chain} chain") return 0.0 if chain == "optimism": print("Using Optimism-specific transfer processing") for date, date_transfers in incoming_transfers.items(): for transfer in date_transfers: try: amount = transfer.get("amount", 0) token_symbol = transfer.get("symbol", "").upper() if amount <= 0: continue # Get historical price for the transfer date date_str = datetime.strptime(date, "%Y-%m-%d").strftime("%d-%m-%Y") if token_symbol == "ETH": # nosec B105 price = fetch_historical_eth_price(date_str) else: coingecko_id = get_coin_id_from_symbol(token_symbol, chain) if coingecko_id: price = fetch_historical_token_price( coingecko_id, date_str, token_symbol ) else: price = None transfer_value = amount * price total_investment += transfer_value print(f"Processed transfer on {date}: {amount} {token_symbol} @ ${price} = ${transfer_value}") except Exception as e: print(f"Error processing transfer: {str(e)}") continue else: print(f"Unsupported chain: {chain}, skipping") return 0.0 reversion_value = track_and_calculate_reversion_value( safe_address=address, chain=chain, incoming_transfers=incoming_transfers, outgoing_transfers=outgoing_transfers, ) logger.info(f"Total investment: {total_investment}") logger.info(f"Reversion value: {reversion_value}") total_investment = total_investment - reversion_value logger.info(f"Total investment after reversion: {total_investment}") print(f"Total initial investment from {chain} chain: ${total_investment}") return total_investment if total_investment > 0 else 0.0 def calculate_initial_value_from_address_and_timestamp( address: str, final_timestamp: int, ) -> Tuple[float, int]: # First fetch the transfers incoming_transfers = fetch_optimism_incoming_transfers(address, final_timestamp) logger.info("Fetched incoming transfers") # Find the first transfer to get the from_address from_address = None for date_transfers in incoming_transfers.values(): if date_transfers: # Check if the list is not empty from_address = date_transfers[0].get('from_address') break if from_address is None: logger.warning("No from_address found in incoming transfers") from_address = "" outgoing_transfers = fetch_optimism_outgoing_transfers(address, final_timestamp, from_address) logger.info(f"Fetched outgoing transfers {outgoing_transfers}") initial_timestamp = final_timestamp for _transfers in incoming_transfers.values(): for _transfer in _transfers: if "timestamp" not in _transfer: continue transfer_timestamp = datetime.fromisoformat(_transfer["timestamp"].replace('Z', '+00:00')).timestamp() if transfer_timestamp < initial_timestamp: initial_timestamp = int(transfer_timestamp) # Then calculate initial investment initial_investment = calculate_initial_investment_value_from_funding_events( chain="optimism", address=address, incoming_transfers=incoming_transfers, outgoing_transfers=outgoing_transfers, ) return initial_investment, int(initial_timestamp) def calculate_final_value_from_address_and_timestamp( address: str, timestamp: int, ) -> float: """ Calculate the final portfolio value at a specific timestamp by fetching ETH and token balances and multiplying by historical prices. """ final_value = 0.0 try: # Get ETH balance and price eth_balance = fetch_eth_balance(address, timestamp) if eth_balance > 0: eth_price = fetch_historical_eth_price( datetime.utcfromtimestamp(timestamp).strftime("%d-%m-%Y") ) if eth_price and eth_price > 0: eth_value = eth_balance * eth_price final_value += eth_value logger.info(f"ETH value: {eth_balance:.6f} ETH @ ${eth_price:.2f} = ${eth_value:.2f}") else: logger.warning(f"Could not fetch ETH price for timestamp {timestamp}") # Get token balances and prices for token_address, (symbol, decimals) in WHITELISTED_TOKENS.items(): try: token_balance = fetch_token_balance( address=address, token_address=token_address, decimals=decimals, timestamp=timestamp, ) if token_balance is not None and token_balance > 0: token_price = fetch_historical_token_price( coin_id=COIN_ID_MAPPING.get(symbol.lower(), symbol.lower()), date_str=datetime.utcfromtimestamp(timestamp).strftime("%d-%m-%Y"), token_symbol=symbol ) if token_price is not None and token_price > 0: token_value = token_balance * token_price final_value += token_value logger.info(f"{symbol} value: {token_balance:.6f} @ ${token_price:.6f} = ${token_value:.2f}") else: logger.warning(f"Could not fetch price for {symbol} at timestamp {timestamp}") except Exception as e: logger.error(f"Error processing token {symbol} ({token_address}): {e}") continue except Exception as e: logger.error(f"Error calculating final value for address {address}: {e}") return 0.0 logger.info(f"Total final value for {address}: ${final_value:.2f}") return final_value def _calculate_adjusted_apr( apr: float, initial_timestamp: int, final_timestamp: int ) -> float: if apr is None or apr == 0: return 0.0 intial_eth_price = fetch_historical_eth_price(datetime.utcfromtimestamp(initial_timestamp).strftime("%d-%m-%Y")) final_eth_price = fetch_historical_eth_price(datetime.utcfromtimestamp(final_timestamp).strftime("%d-%m-%Y")) if ( final_eth_price is not None and intial_eth_price is not None ): adjustment_factor = Decimal("1") - ( Decimal(str(final_eth_price)) / Decimal(str(intial_eth_price)) ) adjusted_apr = round( float(apr) + float(adjustment_factor * Decimal("100")), 2, ) return adjusted_apr else: logger.warning( f"Could not fetch ETH prices for timestamps {initial_timestamp} and {final_timestamp}. Returning original APR: {apr}" ) return apr def calculate_apr_and_roi( initial_value: float, final_value: float, initial_timestamp: int, final_timestamp: int ) -> Tuple[float, float, float]: if final_value <= 0: logger.warning("Final value is non-positive, returning 0.0 for APR and ROI.") return 0.0, 0.0, 0.0 # Calculate ROI (Return on Investment) roi = ((final_value / initial_value) - 1) * 100 # Calculate hours since investment hours = max(1, (final_timestamp - int(initial_timestamp)) / 3600) # Calculate time ratio (hours in a year / hours since investment) hours_in_year = 8760 time_ratio = hours_in_year / hours # Calculate APR (Annualized ROI) apr = float(roi * time_ratio) if apr < 0: apr = roi adjust_apr = _calculate_adjusted_apr( apr=apr, initial_timestamp=initial_timestamp, final_timestamp=final_timestamp ) return float(round(apr, 2)), float(round(adjust_apr, 2)), float(round(roi, 2)) def fix_apr_and_roi(df: DataFrame) -> DataFrame: """ Fix APR and ROI values by recalculating them based on actual blockchain data. This function processes each row only once and includes proper error handling. """ if df.empty: logger.info("Empty DataFrame provided to fix_apr_and_roi, returning as-is") return df logger.info(f"Starting fix_apr_and_roi with {len(df)} rows") # Remove rows with excluded addresses original_count = len(df) df = df[~df['address'].isin(EXCLUDED_ADDRESSES)] excluded_count = original_count - len(df) if excluded_count > 0: logger.info(f"Excluded {excluded_count} rows with excluded addresses") # Remove rows with timestamps before 2025-06-06 original_count = len(df) df = df[df['timestamp'] >= '2025-06-06 00:00:00.000000'] old_data_count = original_count - len(df) if old_data_count > 0: logger.info(f"Excluded {old_data_count} rows with timestamps before 2025-06-06") # Check for future timestamps and filter them out current_time = datetime.now().timestamp() future_rows = df[df['timestamp'].apply(lambda x: x.timestamp()) > current_time] if not future_rows.empty: logger.warning(f"Found {len(future_rows)} rows with future timestamps, excluding them") for idx, row in future_rows.iterrows(): logger.warning(f"Future timestamp found: {row['timestamp']} (timestamp: {row['timestamp'].timestamp()})") df = df[df['timestamp'].apply(lambda x: x.timestamp()) <= current_time] if df.empty: logger.warning("No valid rows remaining after filtering, returning empty DataFrame") return df logger.info(f"Processing {len(df)} valid rows") # Create a copy to avoid modifying the original DataFrame during iteration df_copy = df.copy() rows_to_drop = [] processed_count = 0 for idx, row in df_copy.iterrows(): try: if row['is_dummy']: logger.debug(f"Skipping dummy row {idx}") continue processed_count += 1 logger.info(f"Processing row {processed_count}/{len(df_copy)} - Address: {row['address']}") final_timestamp = int(row['timestamp'].timestamp()) # Validate timestamp is not in the future if final_timestamp > current_time: logger.warning(f"Skipping row {idx} with future timestamp: {final_timestamp}") rows_to_drop.append(idx) continue calculation_metrics = row['calculation_metrics'] # Calculate initial value and timestamp with error handling try: initial_value, initial_timestamp = calculate_initial_value_from_address_and_timestamp( row['address'], final_timestamp ) except Exception as e: logger.error(f"Error calculating initial value for address {row['address']}: {e}") rows_to_drop.append(idx) continue # Calculate final value with error handling try: final_value = calculate_final_value_from_address_and_timestamp( row['address'], final_timestamp ) # Add volume if it exists and is positive volume = row.get("volume", 0) if volume and volume > 0: final_value += volume logger.info(f"Added volume ${volume:.2f} to final value for address {row['address']}") except Exception as e: logger.error(f"Error calculating final value for address {row['address']}: {e}") rows_to_drop.append(idx) continue if initial_value <= 0: logger.warning(f"Initial value for address {row['address']} is non-positive ({initial_value}), skipping row.") rows_to_drop.append(idx) continue # Update calculation metrics calculation_metrics['initial_value'] = initial_value calculation_metrics['final_value'] = final_value df.at[idx, 'calculation_metrics'] = calculation_metrics # Calculate APR and ROI with error handling try: apr, adjusted_apr, roi = calculate_apr_and_roi( initial_value=initial_value, final_value=final_value, initial_timestamp=initial_timestamp, final_timestamp=final_timestamp ) df.at[idx, 'apr'] = apr df.at[idx, 'adjusted_apr'] = adjusted_apr df.at[idx, 'roi'] = roi logger.info(f"Successfully processed address {row['address']}: APR={apr:.2f}, ROI={roi:.2f}") except Exception as e: logger.error(f"Error calculating APR/ROI for address {row['address']}: {e}") rows_to_drop.append(idx) continue except Exception as e: logger.error(f"Unexpected error processing row {idx}: {e}") rows_to_drop.append(idx) continue # Drop rows that had errors if rows_to_drop: logger.info(f"Dropping {len(rows_to_drop)} rows due to errors") df = df.drop(rows_to_drop) logger.info(f"Completed fix_apr_and_roi: {len(df)} rows remaining") return df if __name__ == "__main__": test_address = "0xc8E264f402Ae94f69bDEf8B1f035F7200cD2B0c7" test_final_timestamp = 1750711233 v = calculate_initial_value_from_address_and_timestamp( test_address, test_final_timestamp ) print(v)