import gradio as gr import pandas as pd import plotly.express as px from datetime import datetime, timedelta import requests from io import BytesIO def load_and_process_data(): try: url = "https://huggingface.co/datasets/cfahlgren1/hub-stats/resolve/main/spaces.parquet" response = requests.get(url) df = pd.read_parquet(BytesIO(response.content)) # 30일치 데이터 준비 thirty_days_ago = datetime.now() - timedelta(days=30) df['createdAt'] = pd.to_datetime(df['createdAt']) df = df[df['createdAt'] >= thirty_days_ago].copy() # 날짜별 데이터 처리 dates = pd.date_range(start=thirty_days_ago, end=datetime.now(), freq='D') daily_ranks = [] for date in dates: # 해당 날짜의 데이터 추출 date_data = df[df['createdAt'].dt.date <= date.date()].copy() # trendingScore가 같은 경우 id로 정렬하여 유니크한 순위 보장 date_data = date_data.sort_values(['trendingScore', 'id'], ascending=[False, True]) # 순위 계산 date_data['rank'] = range(1, len(date_data) + 1) date_data['date'] = date.date() # 필요한 컬럼만 선택 daily_ranks.append( date_data[['id', 'date', 'rank', 'trendingScore', 'createdAt']] ) # 전체 데이터 병합 daily_ranks_df = pd.concat(daily_ranks, ignore_index=True) # 최신 날짜의 top 100 추출 latest_date = daily_ranks_df['date'].max() top_100_spaces = daily_ranks_df[ daily_ranks_df['date'] == latest_date ].sort_values('rank').head(100).copy() print(f"Total records: {len(daily_ranks_df)}") print(f"Unique spaces: {len(daily_ranks_df['id'].unique())}") print(f"Date range: {daily_ranks_df['date'].min()} to {daily_ranks_df['date'].max()}") return daily_ranks_df, top_100_spaces except Exception as e: print(f"Error loading data: {e}") return pd.DataFrame(), pd.DataFrame() def create_trend_chart(space_id, daily_ranks_df): if space_id is None or daily_ranks_df.empty: return None try: # 특정 space의 데이터만 필터링 space_data = daily_ranks_df[daily_ranks_df['id'] == space_id].copy() if space_data.empty: return None # 데이터 정렬 space_data = space_data.sort_values('date') fig = px.line( space_data, x='date', y='rank', title=f'Daily Rank Trend for {space_id}', labels={'date': 'Date', 'rank': 'Rank'}, markers=True ) fig.update_layout( xaxis_title="Date", yaxis_title="Rank", yaxis_autorange="reversed", # 순위 1이 위쪽에 오도록 hovermode='x unified', plot_bgcolor='white', paper_bgcolor='white' ) return fig except Exception as e: print(f"Error creating chart: {e}") return None def update_display(selection): global daily_ranks_df if not selection: return None, "Please select a space" try: # 선택된 항목에서 space ID 추출 space_id = selection.split(': ')[1].split(' (Score')[0] # 최신 데이터 가져오기 latest_data = daily_ranks_df[ daily_ranks_df['id'] == space_id ].sort_values('date').iloc[-1] info_text = f"""ID: {space_id} Current Rank: {int(latest_data['rank'])} Trending Score: {latest_data['trendingScore']:.2f} Created At: {latest_data['createdAt'].strftime('%Y-%m-%d')}""" chart = create_trend_chart(space_id, daily_ranks_df) return chart, info_text except Exception as e: print(f"Error in update_display: {e}") return None, f"Error processing data: {str(e)}" # 데이터 로드 print("Loading initial data...") daily_ranks_df, top_100_spaces = load_and_process_data() print("Data loaded.") # Gradio 인터페이스 생성 with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Trending Spaces Dashboard") with gr.Row(): with gr.Column(scale=1): # 순위가 포함된 리스트로 표시 space_choices = [ f"Rank {row['rank']}: {row['id']} (Score: {row['trendingScore']:.2f})" for _, row in top_100_spaces.iterrows() ] space_list = gr.Radio( choices=space_choices, label="Top 100 Trending Spaces", info="Select a space to view its rank trend", value=space_choices[0] if space_choices else None ) info_box = gr.Textbox( label="Space Details", value="", interactive=False, lines=4 ) with gr.Column(scale=2): trend_plot = gr.Plot( label="Daily Rank Trend" ) space_list.change( fn=update_display, inputs=[space_list], outputs=[trend_plot, info_box] ) if __name__ == "__main__": demo.launch(share=True)