# Sample CSV values: # apr,adjusted_apr,timestamp,portfolio_snapshot,calculation_metrics,roi,agent_id,is_dummy,address,agent_name,metric_type,first_investment_timestamp,agent_hash,volume,trading_type,selected_protocols # -0.03,1.75,2025-05-15 21:37:27.000000,"{'portfolio': {'portfolio_value': 29.34506065817397, 'allocations': [{'chain': 'optimism', 'type': 'velodrome', 'id': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'assets': ['FRAX', 'alUSD'], 'apr': 11.9, 'details': 'Velodrome Pool', 'ratio': 100.0, 'address': '0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758'}], 'portfolio_breakdown': [{'asset': 'FRAX', 'address': '0x2E3D870790dC77A83DD1d18184Acc7439A53f475', 'balance': 12.498312351563191, 'price': 0.999924, 'value_usd': 12.497362479824472, 'ratio': 0.425876}, {'asset': 'alUSD', 'address': '0xCB8FA9a76b8e203D8C3797bF438d8FB81Ea3326A', 'balance': 17.023792285753334, 'price': 0.989656, 'value_usd': 16.8476981783495, 'ratio': 0.574124}], 'address': '0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758'}, 'positons': [{'chain': 'optimism', 'pool_address': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'dex_type': 'velodrome', 'token0': '0x2E3D870790dC77A83DD1d18184Acc7439A53f475', 'token1': '0xCB8FA9a76b8e203D8C3797bF438d8FB81Ea3326A', 'token0_symbol': 'FRAX', 'token1_symbol': 'alUSD', 'apr': 11.901789131732096, 'pool_id': '0xaF03f51DE7a0E62BF061F6Fc3931cF79166B0a29', 'is_stable': True, 'is_cl_pool': False, 'amount0': 12549523370531409633, 'amount1': 16972223462662011900, 'timestamp': 1747319387, 'status': 'open', 'tx_hash': '0xb487bb4a45bcd7bb3b9e9e3fabe76bf6594828091598ffab69704754b4c8bea8'}]}","{'initial_value': 29.353178464538146, 'final_value': 29.34506065817397, 'f_i_ratio': -0.0002765562977782299, 'last_investment_timestamp': 1747319387, 'time_ratio': 5380.753851502806}",-0.0002765562977782299,86,False,0xAD588C11Ea73123fDe199C5C4F7F75C6e495C758,nusus-tayar25,APR,,,,, # Parse the optimus_apr_values.csv file # Iterate on the rows: For each row: # Parse address, final_value # Compute initial_value using the parsed address similar to an Optimus function # Compute the APR and ROI similar to an Optimus function # Write the row with initial_value, APR, and ROI to a new CSV file from datetime import datetime from decimal import Decimal import json import logging import os import time from typing import Dict, Optional, Tuple from pandas import DataFrame import requests from web3 import Web3 ETHERSCAN_API_KEY = "" EXCLUDED_ADDRESSES = { # Testnet agents of Gaurav, Divya, and Priyanshu "0x6aA9b180c1a4Ef43Ea540Da905f23BAfAEEB7DC8", # agent_id 84 "0x6aA9b180c1a4Ef43Ea540Da905f23BAfAEEB7CB5", # agent_id 86 "0x3B3AbC1604fAd139F841Da5c3Cad73a72621fee4", # agent_id 102 } COINGECKO_PRICE_API_URL = "https://api.coingecko.com/api/v3/coins/{coin_id}/history?date={date}}" WHITELISTED_TOKENS = { # Optimism tokens - stablecoins "0x0b2c639c533813f4aa9d7837caf62653d097ff85": ("USDC", 6), "0x01bff41798a0bcf287b996046ca68b395dbc1071": ("USDT0", 6), "0x94b008aa00579c1307b0ef2c499ad98a8ce58e58": ("USDT", 6), "0x7f5c764cbc14f9669b88837ca1490cca17c31607": ("USDC.e", 6), "0x8ae125e8653821e851f12a49f7765db9a9ce7384": ("DOLA", 18), "0xc40f949f8a4e094d1b49a23ea9241d289b7b2819": ("LUSD", 18), "0xda10009cbd5d07dd0cecc66161fc93d7c9000da1": ("DAI", 18), "0x087c440f251ff6cfe62b86dde1be558b95b4bb9b": ("BOLD", 18), "0x2e3d870790dc77a83dd1d18184acc7439a53f475": ("FRAX", 18), "0x2218a117083f5b482b0bb821d27056ba9c04b1d3": ("sDAI", 18), "0x1217bfe6c773eec6cc4a38b5dc45b92292b6e189": ("oUSDT", 6), "0x4f604735c1cf31399c6e711d5962b2b3e0225ad3": ("USDGLO", 18), } COIN_ID_MAPPING = { "usdc": "usd-coin", "alusd": "alchemix-usd", "usdt0": "usdt0", "usdt": "bridged-usdt", "usdc.e": "bridged-usd-coin-optimism", "usx": "token-dforce-usd", "dola": "dola-usd", "lusd": "liquity-usd", "dai": "makerdao-optimism-bridged-dai-optimism", "bold": "liquity-bold", "frax": "frax", "sdai": "savings-dai", "usd+": "overnight-fi-usd-optimism", "ousdt": "openusdt", "usdglo": "glo-dollar", } # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) w3 = Web3(Web3.HTTPProvider("https://rpc-gate.autonolas.tech/optimism-rpc/")) def get_coin_id_from_symbol(symbol: str, chain: str) -> Optional[str]: """Map token symbol to CoinGecko ID.""" if chain == "optimism": coin_id_map = { "USDC": "usd-coin", "ALUSD": "alchemix-usd", "USDT0": "usdt0", "USDT": "bridged-usdt", "MSUSD": None, "USDC.E": "bridged-usd-coin-optimism", "USX": "token-dforce-usd", "DOLA": "dola-usd", "LUSD": "liquity-usd", "DAI": "makerdao-optimism-bridged-dai-optimism", "BOLD": "liquity-bold", "FRAX": "frax", "SDAI": "savings-dai", "USD+": "overnight-fi-usd-optimism", "OUSDT": "openusdt", "USDGLO": "glo-dollar", "ETH": "ethereum", "WETH": "ethereum", "WBTC": "wrapped-bitcoin", } return coin_id_map.get(symbol.upper()) return None def load_cache(name: str) -> Dict: """Load price cache from JSON file.""" cache_file = f"{name}_cache.json" if os.path.exists(cache_file): try: with open(cache_file, 'r') as f: return json.load(f) except json.JSONDecodeError: logger.warning("Cache file corrupted, creating new cache") return {} return {} def save_cache(name: str, cache: Dict): """Save price cache to JSON file.""" cache_file = f"{name}_cache.json" with open(cache_file, 'w') as f: json.dump(cache, f, indent=2) def get_cached_price(date_str: str, token_symbol: str) -> Optional[float]: """Get price from cache if available.""" cache = load_cache(name="price") return cache.get(date_str, {}).get(token_symbol) def update_price_cache(date_str: str, token_symbol: str, price: float): """Update price cache with new value.""" cache = load_cache(name="price") if date_str not in cache: cache[date_str] = {} cache[date_str][token_symbol] = price save_cache(name="price", cache=cache) def get_cached_request(cache_key: str) -> Optional[Dict]: """Get cached request response if available.""" cache = load_cache(name="request") return cache.get(cache_key) def update_request_cache(cache_key: str, response: Dict): """Update request cache with new response.""" cache = load_cache(name="request") cache[cache_key] = response save_cache(name="request", cache=cache) def fetch_historical_eth_price(date_str: str) -> float: """Fetch historical ETH price from CoinGecko with caching.""" # Check cache first cached_price = get_cached_price(date_str, "ETH") if cached_price is not None: return cached_price try: url = f"https://api.coingecko.com/api/v3/coins/ethereum/history" params = {"date": date_str, "localization": "false"} # Add delay to respect rate limits time.sleep(1.2) response = requests.get(url, params=params) response.raise_for_status() data = response.json() if "market_data" in data and "current_price" in data["market_data"]: price = data["market_data"]["current_price"]["usd"] # Update cache update_price_cache(date_str, "ETH", price) return price return 0.0 except Exception as e: print(f"Error fetching ETH price for {date_str}: {str(e)}") return 0.0 def fetch_historical_token_price(coin_id: str, date_str: str, token_symbol: str) -> float: """Fetch historical token price from CoinGecko with caching.""" # Check cache first cached_price = get_cached_price(date_str, token_symbol) if cached_price is not None: return cached_price try: success, data = request_with_retries( endpoint=f"https://api.coingecko.com/api/v3/coins/{coin_id}/history", params={"date": date_str, "localization": "false"}, ) if not success: logger.error(f"Failed to fetch historical price for {coin_id} on {date_str}") return 0.0 # Add delay to respect rate limits time.sleep(1.2) if "market_data" in data and "current_price" in data["market_data"]: price = data["market_data"]["current_price"]["usd"] # Update cache update_price_cache(date_str, token_symbol, price) return price return 0.0 except Exception as e: print(f"Error fetching price for {coin_id} on {date_str}: {str(e)}") return 0.0 def get_block_at_timestamp( timestamp: int, chain: str = "optimism" ) -> Optional[int]: success, res = request_with_retries( endpoint=f"https://api-optimistic.etherscan.io/api?module=block&action=getblocknobytime×tamp={timestamp}&closest=before&apikey={ETHERSCAN_API_KEY}", ) if success and res.get("status") == "1" and "result" in res: return int(res.get("result")) else: logger.error(f"Failed to fetch block at timestamp {timestamp} for {chain}: {res.get('message', 'Unknown error')}") return None def fetch_eth_balance(address: str, timestamp: float) -> float: key = "eth_balance" cache = load_cache(name=key) if f"{address}_{timestamp}" in cache: return cache[f"{address}_{timestamp}"] / (10 ** 18) balance = w3.eth.get_balance( account=Web3.to_checksum_address(address), block_identifier=get_block_at_timestamp(int(timestamp)) ) cache[f"{address}_{timestamp}"] = balance save_cache(name=key, cache=cache) return balance / (10 ** 18) def fetch_token_balance( address: str, token_address: str, timestamp: int, decimals: int = 18 ) -> Optional[float]: contract = w3.eth.contract( address=Web3.to_checksum_address(token_address), abi=[ { "constant": True, "inputs": [{"name": "_owner", "type": "address"}], "name": "balanceOf", "outputs": [{"name": "", "type": "uint256"}], "payable": False, "stateMutability": "view", "type": "function", } ] ) try: cache_key = f"token_balance_{address}_{token_address}_{timestamp}" cache = load_cache(name="token_balance") if cache_key in cache: return cache[cache_key] / (10 ** decimals) balance = contract.functions.balanceOf(address).call(block_identifier=get_block_at_timestamp(int(timestamp))) cache[cache_key] = balance save_cache(name="token_balance", cache=cache) return balance / (10 ** decimals) if balance else 0.0 except Exception as e: logger.error(f"Error fetching token balance for {address} at {timestamp}: {e}") return None def get_datetime_from_timestamp(timestamp: str) -> Optional[datetime]: """Convert timestamp string to datetime object.""" try: return datetime.fromisoformat(timestamp.replace("Z", "+00:00")) except (ValueError, TypeError): logger.warning(f"Invalid timestamp format: {timestamp}") return None def request_with_retries( endpoint: str, params: Dict = None, headers: Dict = None, method: str = "GET", body: Dict = None, rate_limited_code: int = 429, retry_wait: int = 5, max_retries: int = 3 ) -> Tuple[bool, Dict]: for attempt in range(max_retries): try: if method.upper() == "POST": cache_key = f"POST_{endpoint}_{str(body or {})}" cached_response = get_cached_request(cache_key) if cached_response is not None: return len(cached_response) > 0, cached_response response = requests.post(endpoint, headers=headers, json=body) if response.ok: update_request_cache(cache_key, response.json()) else: # Check cache first for GET requests cache_key = f"{endpoint}_{str(params or {})}" cached_response = get_cached_request(cache_key) if cached_response is not None: return len(cached_response) > 0, cached_response response = requests.get(endpoint, headers=headers, params=params or {}) # Cache successful responses if response.status_code == 200: update_request_cache(cache_key, response.json()) elif response.status_code == 404: update_request_cache(cache_key, {}) if response.status_code == rate_limited_code: logger.warning(f"Rate limited. Waiting {retry_wait} seconds...") time.sleep(retry_wait) continue if response.status_code != 200: logger.error(f"Request failed with status {response.status_code}") return False, {} return True, response.json() except Exception as e: logger.error(f"Request failed: {str(e)}") if attempt < max_retries - 1: time.sleep(retry_wait) continue return False, {} return False, {} def should_include_transfer_optimism( from_address: str ) -> bool: """Determine if an Optimism transfer should be included based on from address type.""" if not from_address: return False # Exclude zero address if from_address.lower() in [ "0x0000000000000000000000000000000000000000", "0x0", "", ]: return False try: # Use Optimism RPC to check if address is a contract payload = { "jsonrpc": "2.0", "method": "eth_getCode", "params": [from_address, "latest"], "id": 1, } success, result = request_with_retries( endpoint="https://mainnet.optimism.io", method="POST", headers={"Content-Type": "application/json"}, body=payload, rate_limited_code=429, retry_wait=5, ) if not success: logger.error("Failed to check contract code") return False code = result.get("result", "0x") # If code is '0x', it's an EOA if code == "0x": return True # If it has code, check if it's a GnosisSafe safe_check_url = f"https://safe-transaction-optimism.safe.global/api/v1/safes/{from_address}/" success, _ = request_with_retries( endpoint=safe_check_url, headers={"Accept": "application/json"}, rate_limited_code=429, retry_wait=5, ) if success: return True logger.info( f"Excluding transfer from contract: {from_address}" ) return False except Exception as e: logger.error(f"Error checking address {from_address}: {e}") return False def fetch_optimism_transfers( address: str, last_timestamp: int ) -> Dict: base_url = "https://safe-transaction-optimism.safe.global/api/v1" all_transfers_by_date = {} try: logger.info("Fetching Optimism transfers using SafeGlobal API...") last_date = datetime.fromtimestamp(last_timestamp).strftime("%Y-%m-%d") # Fetch incoming transfers transfers_url = f"{base_url}/safes/{address}/incoming-transfers/" processed_count = 0 while True: success, response_json = request_with_retries( endpoint=transfers_url, headers={"Accept": "application/json"}, rate_limited_code=429, retry_wait=5 ) if not success: logger.error("Failed to fetch Optimism transfers") break transfers = response_json.get("results", []) if not transfers: break for transfer in transfers: # Parse timestamp timestamp = transfer.get("executionDate") if not timestamp: continue tx_datetime = get_datetime_from_timestamp(timestamp) tx_date = tx_datetime.strftime("%Y-%m-%d") if tx_datetime else None if not tx_date: continue if tx_datetime.timestamp() > last_timestamp: continue # Process the transfer from_address = transfer.get("from", address) transfer_type = transfer.get("type", "") if from_address.lower() == address.lower(): continue # Initialize date in transfers dict if not exists if tx_date not in all_transfers_by_date: all_transfers_by_date[tx_date] = [] should_include = should_include_transfer_optimism( from_address ) if not should_include: continue # Process different transfer types if transfer_type == "ERC20_TRANSFER": # Token transfer token_info = transfer.get("tokenInfo", {}) token_address = transfer.get("tokenAddress", "") if not token_info: if not token_address: continue # You might want to add token decimal and symbol fetching here symbol = "Unknown" decimals = 18 else: symbol = token_info.get("symbol", "Unknown") decimals = int(token_info.get("decimals", 18) or 18) if symbol.lower() != "usdc": continue value_raw = int(transfer.get("value", "0") or "0") amount = value_raw / (10**decimals) transfer_data = { "from_address": from_address, "amount": amount, "token_address": token_address, "symbol": symbol, "timestamp": timestamp, "tx_hash": transfer.get("transactionHash", ""), "type": "token" } elif transfer_type == "ETHER_TRANSFER": # ETH transfer try: value_wei = int(transfer.get("value", "0") or "0") amount_eth = value_wei / 10**18 if amount_eth <= 0: continue except (ValueError, TypeError): logger.warning(f"Skipping transfer with invalid value: {transfer.get('value')}") continue transfer_data = { "from_address": from_address, "amount": amount_eth, "token_address": "", "symbol": "ETH", "timestamp": timestamp, "tx_hash": transfer.get("transactionHash", ""), "type": "eth" } else: # Skip other transfer types continue all_transfers_by_date[tx_date].append(transfer_data) processed_count += 1 # Show progress if processed_count % 100 == 0: logger.info(f"Processed {processed_count} Optimism transfers...") # Check for next page cursor = response_json.get("next") if not cursor: break logger.info(f"Completed Optimism transfers: {processed_count} found") return all_transfers_by_date except Exception as e: logger.error(f"Error fetching Optimism transfers: {e}") return {} def calculate_initial_investment_value_from_funding_events( transfers: Dict, chain: str, ) -> float: total_investment = 0.0 if not transfers: print(f"No transfers found for {chain} chain") return 0.0 if chain == "optimism": print("Using Optimism-specific transfer processing") for date, date_transfers in transfers.items(): for transfer in date_transfers: try: amount = transfer.get("amount", 0) token_symbol = transfer.get("symbol", "").upper() if amount <= 0: continue # Get historical price for the transfer date date_str = datetime.strptime(date, "%Y-%m-%d").strftime("%d-%m-%Y") if token_symbol == "ETH": # nosec B105 price = fetch_historical_eth_price(date_str) else: coingecko_id = get_coin_id_from_symbol(token_symbol, chain) if coingecko_id: price = fetch_historical_token_price( coingecko_id, date_str, token_symbol ) else: price = None transfer_value = amount * price total_investment += transfer_value print(f"Processed transfer on {date}: {amount} {token_symbol} @ ${price} = ${transfer_value}") except Exception as e: print(f"Error processing transfer: {str(e)}") continue else: print(f"Unsupported chain: {chain}, skipping") return 0.0 print(f"Total initial investment from {chain} chain: ${total_investment}") return total_investment if total_investment > 0 else 0.0 def calculate_initial_value_from_address_and_timestamp( address: str, final_timestamp: int, ) -> Tuple[float, int]: # First fetch the transfers transfers = fetch_optimism_transfers(address, final_timestamp) initial_timestamp = final_timestamp for _transfers in transfers.values(): for _transfer in _transfers: if "timestamp" not in _transfer: continue transfer_timestamp = datetime.fromisoformat(_transfer["timestamp"].replace('Z', '+00:00')).timestamp() if transfer_timestamp < initial_timestamp: initial_timestamp = int(transfer_timestamp) # Then calculate initial investment initial_investment = calculate_initial_investment_value_from_funding_events( transfers=transfers, chain="optimism", ) return initial_investment, int(initial_timestamp) def calculate_final_value_from_address_and_timestamp( address: str, timestamp: int, ) -> float: eth_balance = fetch_eth_balance(address, timestamp) eth_price = fetch_historical_eth_price( datetime.utcfromtimestamp(timestamp).strftime("%d-%m-%Y") ) final_value = eth_balance * eth_price for token_address, (symbol, decimals) in WHITELISTED_TOKENS.items(): token_balance = fetch_token_balance( address=address, token_address=token_address, decimals=decimals, timestamp=timestamp, ) token_price = fetch_historical_token_price( coin_id=COIN_ID_MAPPING.get(symbol.lower(), symbol.lower()), date_str=datetime.utcfromtimestamp(timestamp).strftime("%d-%m-%Y"), token_symbol=symbol ) if token_balance is not None and token_price is not None: token_value = token_balance * token_price if token_value > 0: final_value += token_value return final_value def _calculate_adjusted_apr( apr: float, initial_timestamp: int, final_timestamp: int ) -> float: if apr is None or apr == 0: return 0.0 intial_eth_price = fetch_historical_eth_price(datetime.utcfromtimestamp(initial_timestamp).strftime("%d-%m-%Y")) final_eth_price = fetch_historical_eth_price(datetime.utcfromtimestamp(final_timestamp).strftime("%d-%m-%Y")) if ( final_eth_price is not None and intial_eth_price is not None ): adjustment_factor = Decimal("1") - ( Decimal(str(final_eth_price)) / Decimal(str(intial_eth_price)) ) adjusted_apr = round( float(apr) + float(adjustment_factor * Decimal("100")), 2, ) return adjusted_apr else: logger.warning( f"Could not fetch ETH prices for timestamps {initial_timestamp} and {final_timestamp}. Returning original APR: {apr}" ) return apr def calculate_apr_and_roi( initial_value: float, final_value: float, initial_timestamp: int, final_timestamp: int ) -> Tuple[float, float, float]: if final_value <= 0: logger.warning("Final value is non-positive, returning 0.0 for APR and ROI.") return 0.0, 0.0, 0.0 # Calculate ROI (Return on Investment) roi = ((final_value / initial_value) - 1) * 100 # Calculate hours since investment hours = max(1, (final_timestamp - int(initial_timestamp)) / 3600) # Calculate time ratio (hours in a year / hours since investment) hours_in_year = 8760 time_ratio = hours_in_year / hours # Calculate APR (Annualized ROI) apr = float(roi * time_ratio) if apr < 0: apr = roi adjust_apr = _calculate_adjusted_apr( apr=apr, initial_timestamp=initial_timestamp, final_timestamp=final_timestamp ) return float(round(apr, 2)), float(round(adjust_apr, 2)), float(round(roi, 2)) def fix_apr_and_roi(df: DataFrame) -> DataFrame: # Remove rows with excluded addresses df = df[~df['address'].isin(EXCLUDED_ADDRESSES)] # Remove rows with timestamps before 2025-06-06 df = df[df['timestamp'] >= '2025-06-06 00:00:00.000000'] for idx, row in df.iterrows(): if row['is_dummy']: continue final_timestamp = int(row['timestamp'].timestamp()) calculation_metrics = row['calculation_metrics'] initial_value, initial_timestamp = calculate_initial_value_from_address_and_timestamp(row['address'], final_timestamp) final_value = calculate_final_value_from_address_and_timestamp(row['address'], final_timestamp) if row["volume"] > 0: final_value += row["volume"] if initial_value <= 0: logger.warning(f"Initial value for address {row['address']} is non-positive, skipping row.") df = df.drop(idx) continue calculation_metrics['initial_value'] = initial_value calculation_metrics['final_value'] = final_value df.at[idx, 'calculation_metrics'] = calculation_metrics apr, adjusted_apr, roi = calculate_apr_and_roi( initial_value=initial_value, final_value=final_value, initial_timestamp=initial_timestamp, final_timestamp=final_timestamp ) df.at[idx, 'apr'] = apr df.at[idx, 'adjusted_apr'] = adjusted_apr df.at[idx, 'roi'] = roi return df if __name__ == "__main__": test_address = "0xa11417aeBF3932ee895008eDE8eA95616f488bCf" test_final_timestamp = 1749567654 v = calculate_initial_value_from_address_and_timestamp( test_address, test_final_timestamp ) print(v)