import gradio as gr import pandas as pd import numpy as np import yfinance as yf import plotly.express as px import plotly.graph_objects as go import tempfile import os from scipy.optimize import minimize # --- Helpers --- def fetch_price_data_from_tickers(tickers_text, period="1y", interval="1d"): tickers = [t.strip().upper() for t in tickers_text.split(",") if t.strip()] if not tickers: return {} data = yf.download(tickers, period=period, interval=interval, group_by='ticker', auto_adjust=True, progress=False, threads=True) result = {} if isinstance(data.columns, pd.MultiIndex): for t in tickers: if t in data.columns.levels[0]: df = data[t].copy().dropna(how='all') if not df.empty: df = df.reset_index()[["Date","Close"]] result[t] = df else: df = data.reset_index()[["Date","Close"]] if not df.empty: if len(tickers) == 1: result[tickers[0]] = df else: # assign same close series for first ticker as fallback result[tickers[0]] = df return result def read_price_csv(uploaded_file): df = pd.read_csv(uploaded_file) # Expect Date column and ticker columns with prices or first column as Date if 'Date' not in df.columns and 'date' in df.columns: df.rename(columns={'date':'Date'}, inplace=True) if 'Date' not in df.columns: # try index as date df = df.reset_index().rename(columns={'index':'Date'}) df['Date'] = pd.to_datetime(df['Date']) df = df.sort_values('Date') result = {} # assume other columns are tickers for col in df.columns: if col.lower() == 'date': continue series = df[['Date', col]].dropna(how='all') series.columns = ['Date', 'Close'] if not series.empty: result[col] = series return result def build_price_matrix(price_dfs): # price_dfs: dict ticker -> DataFrame(Date, Close) dfs = [] for t, df in price_dfs.items(): d = df[['Date','Close']].copy() d = d.rename(columns={'Close': t}) dfs.append(d.set_index('Date')) if not dfs: return pd.DataFrame() df_all = pd.concat(dfs, axis=1).dropna(how='any') # require aligned dates return df_all def returns_from_prices(price_df): # price_df: DataFrame of aligned prices (Date index) rets = price_df.pct_change().dropna() return rets def portfolio_performance(weights, mean_rets, cov_matrix): port_ret = np.dot(weights, mean_rets) port_vol = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights))) return port_ret, port_vol def min_variance_for_target(mean_rets, cov, target_ret): n = len(mean_rets) args = (cov,) # constraints: weights sum to 1, portfolio return == target_ret constraints = ({'type':'eq', 'fun': lambda w: np.sum(w) - 1}, {'type':'eq', 'fun': lambda w: np.dot(w, mean_rets) - target_ret}) bounds = tuple((0.0, 1.0) for _ in range(n)) # long-only x0 = np.repeat(1/n, n) def portfolio_var(w, cov): return np.dot(w.T, np.dot(cov, w)) res = minimize(portfolio_var, x0, args=args, method='SLSQP', bounds=bounds, constraints=constraints, options={'maxiter':1000}) if not res.success: raise ValueError("Optimization failed: " + str(res.message)) return res.x def max_sharpe_portfolio(mean_rets, cov, rf=0.0): n = len(mean_rets) bounds = tuple((0.0, 1.0) for _ in range(n)) # long-only constraints = ({'type':'eq', 'fun': lambda w: np.sum(w) - 1},) x0 = np.repeat(1/n, n) def neg_sharpe(w, mean_rets, cov, rf): p_ret = np.dot(w, mean_rets) p_vol = np.sqrt(np.dot(w.T, np.dot(cov, w))) return - (p_ret - rf) / (p_vol or 1e-9) res = minimize(neg_sharpe, x0, args=(mean_rets, cov, rf), method='SLSQP', bounds=bounds, constraints=constraints, options={'maxiter':1000}) if not res.success: raise ValueError("Optimization failed: " + str(res.message)) return res.x def efficient_frontier(mean_rets, cov, points=20): # generate target returns between min and max min_ret = float(np.min(mean_rets)) max_ret = float(np.max(mean_rets)) targets = np.linspace(min_ret, max_ret, points) frontier = [] for tr in targets: try: w = min_variance_for_target(mean_rets, cov, tr) r, v = portfolio_performance(w, mean_rets, cov) frontier.append((r, v, w)) except Exception: # skip failed points continue return frontier def save_weights_csv(tickers, weights): tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", prefix="weights_") df = pd.DataFrame({'ticker': tickers, 'weight': np.round(weights, 6)}).sort_values('weight', ascending=False) df.to_csv(tmp.name, index=False) return tmp.name # --- Main function for Gradio --- def run_optimizer(uploaded_csv, tickers_text, period, interval, opt_mode, rf_pct, frontier_points): # Load price data price_dfs = {} if uploaded_csv is not None: try: price_dfs = read_price_csv(uploaded_csv.name if hasattr(uploaded_csv, 'name') else uploaded_csv) except Exception as e: return f"CSV read error: {e}", None, None, None, None elif tickers_text and tickers_text.strip(): try: price_dfs = fetch_price_data_from_tickers(tickers_text, period=period, interval=interval) except Exception as e: return f"Ticker fetch error: {e}", None, None, None, None else: return "Provide tickers or upload a price CSV.", None, None, None, None if not price_dfs: return "No price series found for the provided inputs.", None, None, None, None price_matrix = build_price_matrix(price_dfs) if price_matrix.empty or price_matrix.shape[1] < 2: return "Need at least 2 aligned price series (columns) to optimize.", None, None, None, None rets = returns_from_prices(price_matrix) mean_rets = rets.mean().values * 252 # annualized mean return cov = rets.cov().values * 252 # annualized covariance tickers = list(price_matrix.columns) try: if opt_mode == 'Max Sharpe': w = max_sharpe_portfolio(mean_rets, cov, rf=float(rf_pct)/100.0) p_ret, p_vol = portfolio_performance(w, mean_rets, cov) sharpe = (p_ret - float(rf_pct)/100.0) / (p_vol or 1e-9) weights_csv = save_weights_csv(tickers, w) # plots alloc_fig = px.bar(x=tickers, y=w, labels={'x':'Ticker','y':'Weight'}, title='Portfolio Allocation') perf_text = f"Expected annual return: {p_ret:.2%}\nExpected annual vol: {p_vol:.2%}\nSharpe (rf={rf_pct}%): {sharpe:.3f}" return perf_text, alloc_fig, weights_csv, None, None elif opt_mode == 'Min Variance (target return)': # choose target as mean of mean_rets target = float(np.mean(mean_rets)) w = min_variance_for_target(mean_rets, cov, target) p_ret, p_vol = portfolio_performance(w, mean_rets, cov) weights_csv = save_weights_csv(tickers, w) alloc_fig = px.bar(x=tickers, y=w, labels={'x':'Ticker','y':'Weight'}, title='Portfolio Allocation (Min Variance)') perf_text = f"Target return: {target:.2%}\nExpected annual vol: {p_vol:.2%}" return perf_text, alloc_fig, weights_csv, None, None elif opt_mode == 'Efficient Frontier': frontier = efficient_frontier(mean_rets, cov, points=int(frontier_points)) if not frontier: return "Failed to compute efficient frontier.", None, None, None, None rets_f = [r for r,v,w in frontier] vols_f = [v for r,v,w in frontier] fig = go.Figure() fig.add_trace(go.Scatter(x=vols_f, y=rets_f, mode='lines+markers', name='Efficient Frontier')) fig.update_layout(title='Efficient Frontier (Annualized)', xaxis_title='Volatility', yaxis_title='Return') # also show allocation for the max-sharpe point found by scanning sharpe_vals = [] for r,v,w in frontier: sharpe_vals.append((r / (v or 1e-9), r, v, w)) best = max(sharpe_vals, key=lambda x: x[0]) best_w = best[3] alloc_fig = px.bar(x=tickers, y=best_w, labels={'x':'Ticker','y':'Weight'}, title='Allocation at chosen frontier point (max Sharpe)') weights_csv = save_weights_csv(tickers, best_w) perf_text = f"Frontier points: {len(rets_f)}. Selected point expected return {best[1]:.2%}, vol {best[2]:.2%}." return perf_text, fig, weights_csv, alloc_fig, None else: return "Unknown optimization mode.", None, None, None, None except Exception as e: return f"Optimization error: {e}", None, None, None, None # --- Gradio UI --- with gr.Blocks(title='Portfolio Optimizer (Mean-Variance)') as demo: gr.Markdown("## 📈 Portfolio Optimizer — Mean-Variance (Hugging Face Space)\nUpload historical prices CSV or provide tickers to fetch prices via yfinance.") with gr.Row(): with gr.Column(scale=2): uploaded = gr.File(label='Upload price CSV (Date + columns per ticker)', file_types=['.csv']) tickers = gr.Textbox(label='Or paste tickers (comma-separated)', value='AAPL, MSFT, NVDA') period = gr.Dropdown(choices=['3mo','6mo','1y','2y','5y'], value='1y', label='Fetch period (if using tickers)') interval = gr.Dropdown(choices=['1d','1wk','1mo'], value='1d', label='Fetch interval (if using tickers)') opt_mode = gr.Radio(choices=['Max Sharpe','Min Variance (target return)','Efficient Frontier'], value='Max Sharpe', label='Optimization mode') rf_pct = gr.Number(value=0.0, label='Risk-free rate (annual %, e.g., 2.0)') frontier_points = gr.Number(value=20, label='Frontier points (Eff. Frontier mode)', precision=0) run = gr.Button('Run Optimization') with gr.Column(scale=3): perf_out = gr.Textbox(label='Portfolio performance / notes', interactive=False) fig_out = gr.Plot() alloc_fig_out = gr.Plot() weights_file = gr.File(label='Download weights CSV') run.click(fn=run_optimizer, inputs=[uploaded, tickers, period, interval, opt_mode, rf_pct, frontier_points], outputs=[perf_out, fig_out, weights_file, alloc_fig_out, gr.Textbox()]) if __name__ == '__main__': demo.launch()