# ======================== (A) 导入库和配置环境 ======================== import pandas as pd import numpy as np import time import warnings import os import logging import tempfile import traceback # 数据分析与建模 from scipy import stats from statsmodels.tsa.stattools import adfuller from statsmodels.tsa.seasonal import STL from statsmodels.stats.diagnostic import acorr_ljungbox from prophet import Prophet import pmdarima as pm from sklearn.metrics import mean_absolute_error, mean_squared_error from joblib import Parallel, delayed # 可视化 import matplotlib.pyplot as plt import seaborn as sns # Web UI import gradio as gr # --- 全局设置 --- warnings.filterwarnings("ignore") logging.getLogger('prophet').setLevel(logging.ERROR) logging.getLogger('cmdstanpy').setLevel(logging.ERROR) # 配置中文字体 (使用 packages.txt 安装的字体) plt.rcParams['font.sans-serif'] = ['WenQuanYi Zen Hei'] plt.rcParams['axes.unicode_minus'] = False # --- 输出文件夹设置 --- OUTPUT_DIR = 'outputs' if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR) # ======================== (B) 辅助函数 ======================== def calculate_metrics(actual, predicted): metrics_df = pd.DataFrame({'actual': actual, 'predicted': predicted}).dropna() if metrics_df.empty: return {'MAE': np.nan, 'RMSE': np.nan, 'MAPE': np.nan, 'sMAPE': np.nan} clean_actual, clean_predicted = metrics_df['actual'], metrics_df['predicted'] mae = mean_absolute_error(clean_actual, clean_predicted) rmse = np.sqrt(mean_squared_error(clean_actual, clean_predicted)) actual_safe = np.where(clean_actual == 0, 1e-6, clean_actual) mape = np.mean(np.abs((clean_actual - clean_predicted) / actual_safe)) * 100 smape = 200 * np.mean(np.abs(clean_actual - clean_predicted) / (np.abs(clean_actual) + np.abs(clean_predicted))) return {'MAE': mae, 'RMSE': rmse, 'MAPE': mape, 'sMAPE': smape} # ======================== (C) 主分析函数 (Gradio核心) ======================== def run_full_analysis(progress=gr.Progress(track_tqdm=True)): """ 这个主函数封装了所有的分析步骤,并通过 yield 返回结果来实时更新Gradio界面。 """ # --- 1. 初始化 --- log_lines = ["## 🚀 数据分析流程已启动..."] figure_paths = [] final_report_text = "" report_file_path = None # 辅助函数,用于将Matplotlib Figure保存为临时图片文件并返回路径 def save_fig_to_path(fig): # 使用 NamedTemporaryFile 来创建一个不会被立即删除的临时文件 with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmpfile: fig.savefig(tmpfile.name) figure_paths.append(tmpfile.name) plt.close(fig) # 操作完成后关闭图形,释放内存 # 辅助函数,用于更新UI状态 def update_ui(new_log_line=None): if new_log_line: log_lines.append(new_log_line) # [log, gallery, final_report, download_button] return "\n\n".join(log_lines), figure_paths, final_report_text, report_file_path yield update_ui() # 立即显示启动信息 try: # --- 2. 数据清洗 --- log_lines.append("### 1. 数据清洗") df = pd.read_excel('gmqrkl.xlsx') df['Date'] = pd.to_datetime(df['Date']) df = df.drop_duplicates(subset=['Date']).sort_values('Date').reset_index(drop=True) log_lines.append(f"✅ 数据读取并去重成功,共 {len(df)} 行。") df['Value'] = df['Value'].replace(0, np.nan) df['Value'].interpolate(method='linear', limit_direction='both', inplace=True) log_lines.append("✅ 零值替换与线性插值完成。") df.set_index('Date', inplace=True) ts_data = df['Value'] yield update_ui() # --- 3. 平稳性检验与差分 --- log_lines.append("### 2. 平稳性检验与差分") current_data = ts_data.dropna() adf_result = adfuller(current_data) if adf_result[1] < 0.05: log_lines.append(f"✅ 序列在 d=0 阶差分后达到平稳 (p={adf_result[1]:.4f})。") d_order = 0 else: current_data_diff = current_data.diff().dropna() adf_result_diff = adfuller(current_data_diff) d_order = 1 if adf_result_diff[1] < 0.05: log_lines.append(f"✅ 序列在 d=1 阶差分后达到平稳 (p={adf_result_diff[1]:.4f})。") else: log_lines.append(f"⚠️ 1阶差分后仍未平稳 (p={adf_result_diff[1]:.4f}),将使用 d=1 继续分析。") current_data = current_data_diff ts_stationary = current_data yield update_ui() # --- 4. 白噪声检验 --- log_lines.append("### 3. 白噪声检验") lags = min(10, len(ts_stationary) // 5) lb_test_result = acorr_ljungbox(ts_stationary, lags=[lags], return_df=True) if lb_test_result['lb_pvalue'].iloc[0] > 0.05: log_lines.append(f"⚠️ 序列可能是白噪声(p-value = {lb_test_result['lb_pvalue'].iloc[0]:.4f}),模型可能无效。") else: log_lines.append(f"✅ 通过白噪声检验 (p-value = {lb_test_result['lb_pvalue'].iloc[0]:.4f}),可以进行后续建模。") yield update_ui() # --- 5. 季节性检验与分解 --- log_lines.append("\n### 4. 季节性检验与STL分解") period = 365 seasonal_enabled = len(ts_data) > 2 * 14 # 数据多于两周则开启周季节性 m_period = 7 if seasonal_enabled else 1 log_lines.append(f"✅ 季节性参数设定: m={m_period}, seasonal={seasonal_enabled}") if len(ts_data) >= 2 * period: seasonal_period_for_stl = period if period % 2 != 0 else period + 1 log_lines.append(f"✅ 准备进行STL分解,周期(period)={period},季节平滑窗口(seasonal)={seasonal_period_for_stl}。") yield update_ui() stl = STL(ts_data, period=period, seasonal=seasonal_period_for_stl) res = stl.fit() fig = res.plot() fig.set_size_inches(12, 8) fig.suptitle(f'STL 分解图 (周期={period})', fontsize=16, y=0.98) plt.tight_layout() save_fig_to_path(fig) log_lines.append("✅ STL分解图已生成。") else: log_lines.append("⚠️ 数据长度不足以进行年度季节性分解。") yield update_ui() # --- 6. 混合策略回测优化窗口大小 --- log_lines.append("\n### 5. 优化训练窗口大小") log_lines.append("⏳ **此步骤计算量大,可能需要5-15分钟,请耐心等待...**") yield update_ui() def evaluate_window_hybrid(window_size, time_series, d, m, seasonal): errors = [] series_values = time_series.values backtest_length = 100 if len(series_values) <= window_size + backtest_length: return {'window_size': window_size, 'mae': np.inf} end_index = len(series_values) start_index = end_index - backtest_length for i in range(start_index, end_index): train_window = pd.Series(series_values[i-window_size:i], index=time_series.index[i-window_size:i]) test_point = series_values[i] use_seasonal = seasonal and (len(train_window) >= 2 * m) try: model = pm.auto_arima(train_window, d=d, seasonal=use_seasonal, m=m, max_p=2, max_q=2, stepwise=True, trace=False, error_action='ignore', suppress_warnings=True) forecast = model.predict(n_periods=1)[0] errors.append(test_point - forecast) except Exception: continue if not errors: return {'window_size': window_size, 'mae': np.inf} return {'window_size': window_size, 'mae': np.mean(np.abs(errors))} window_sizes_to_test = np.arange(70, 211, 14) with Parallel(n_jobs=-1) as parallel: results = parallel( delayed(evaluate_window_hybrid)(ws, ts_data, d_order, m_period, seasonal_enabled) for ws in window_sizes_to_test ) window_results_df = pd.DataFrame(results).sort_values('mae').set_index('window_size') if not window_results_df.empty and np.isfinite(window_results_df['mae'].min()): best_window_size = window_results_df['mae'].idxmin() log_lines.append(f"✅ **窗口优化完成!** 基于MAE,最佳训练窗口大小为: **{best_window_size}** 天。") else: best_window_size = 90 log_lines.append(f"⚠️ 窗口优化失败,使用默认窗口大小: {best_window_size} 天。") fig, ax = plt.subplots(figsize=(12, 6)) ax.plot(window_results_df.index, window_results_df['mae'], marker='o', label='MAE') ax.set_title('训练窗口大小对预测误差的影响') ax.set_xlabel('训练窗口天数'); ax.set_ylabel('误差值'); ax.legend(); ax.grid(True) save_fig_to_path(fig) yield update_ui() # --- 7 & 8. 动态滚动预测与评估 --- log_lines.append("\n### 6. 动态滚动预测与评估") log_lines.append("⏳ **此步骤同样耗时,正在进行模型滚动预测...**") yield update_ui() split_point_roll = int(len(ts_data) * 0.8) test_rolling_target = ts_data.iloc[split_point_roll:] rolling_predictions = {} # SARIMA 滚动 sarima_rolling_preds = [] for i in progress.tqdm(range(len(test_rolling_target)), desc="SARIMA Rolling Forecast"): train_window = ts_data.iloc[split_point_roll + i - best_window_size : split_point_roll + i] try: model = pm.auto_arima(train_window, d=d_order, m=m_period, seasonal=seasonal_enabled, stepwise=True, trace=False, error_action='ignore', suppress_warnings=True) sarima_rolling_preds.append(model.predict(n_periods=1)[0]) except: sarima_rolling_preds.append(sarima_rolling_preds[-1] if sarima_rolling_preds else np.nan) rolling_predictions['Auto-SARIMA'] = pd.Series(sarima_rolling_preds, index=test_rolling_target.index).ffill() log_lines.append("✅ Auto-SARIMA 滚动预测完成。") yield update_ui() # Prophet 滚动 prophet_rolling_preds = [] prophet_model = None for i, (date, value) in enumerate(progress.tqdm(test_rolling_target.items(), desc="Prophet Rolling Forecast")): if i % 14 == 0 or prophet_model is None: train_upto_date = ts_data.loc[:date - pd.Timedelta(days=1)] prophet_train_df = train_upto_date.reset_index().rename(columns={'Date': 'ds', 'Value': 'y'}) prophet_model = Prophet(yearly_seasonality='auto', weekly_seasonality=seasonal_enabled, daily_seasonality=False).fit(prophet_train_df) future_df = pd.DataFrame({'ds': [date]}) forecast = prophet_model.predict(future_df) prophet_rolling_preds.append(forecast['yhat'].iloc[0]) rolling_predictions['Prophet'] = pd.Series(prophet_rolling_preds, index=test_rolling_target.index) log_lines.append("✅ Prophet 滚动预测完成。") # 评估 rolling_evaluation_results = {name: calculate_metrics(test_rolling_target, preds) for name, preds in rolling_predictions.items()} rolling_evaluation_df = pd.DataFrame(rolling_evaluation_results).T.sort_values(by='MAE') best_rolling_model_name = rolling_evaluation_df.index[0] log_lines.append("\n**滚动预测性能对比:**") log_lines.append(f"```\n{rolling_evaluation_df.to_markdown()}\n```") log_lines.append(f"\n==> ✅ 最佳滚动预测模型是: **{best_rolling_model_name}**") yield update_ui() # --- 9. 滚动预测可视化 --- log_lines.append("\n### 7. 生成结果图表") fig, ax = plt.subplots(figsize=(15, 8)) ax.plot(ts_data, label='历史数据', color='gray', alpha=0.5) ax.plot(test_rolling_target, label='真实值 (测试集)', color='blue', linewidth=2) for model_name, preds in rolling_predictions.items(): is_best = ' (最佳)' if model_name == best_rolling_model_name else '' ax.plot(preds.dropna(), label=f'{model_name} 预测{is_best}', linestyle='--') ax.set_title('滚动预测结果对比'); ax.legend(); ax.grid(True) save_fig_to_path(fig) yield update_ui() # --- 10. 最终未来预测 --- forecast_horizon = 90 log_lines.append(f"\n### 8. 使用最佳模型 `{best_rolling_model_name}` 预测未来 {forecast_horizon} 天") # 训练最终模型 if 'Auto-SARIMA' in best_rolling_model_name: final_train_data = ts_data.iloc[-best_window_size:] final_model = pm.auto_arima(final_train_data, d=d_order, m=m_period, seasonal=seasonal_enabled, stepwise=True, trace=False, error_action='ignore', suppress_warnings=True) final_forecast_values, conf_int = final_model.predict(n_periods=forecast_horizon, return_conf_int=True) else: # Prophet final_train_data = ts_data final_prophet_train_df = final_train_data.reset_index().rename(columns={'Date': 'ds', 'Value': 'y'}) final_model = Prophet(yearly_seasonality='auto', weekly_seasonality=seasonal_enabled, daily_seasonality=False).fit(final_prophet_train_df) future_df = final_model.make_future_dataframe(periods=forecast_horizon, freq='D') forecast_obj = final_model.predict(future_df) final_forecast_values = forecast_obj['yhat'].iloc[-forecast_horizon:].values conf_int = np.column_stack((forecast_obj['yhat_lower'].iloc[-forecast_horizon:].values, forecast_obj['yhat_upper'].iloc[-forecast_horizon:].values)) future_dates = pd.date_range(start=ts_data.index[-1] + pd.Timedelta(days=1), periods=forecast_horizon) final_forecast_series = pd.Series(final_forecast_values, index=future_dates) # 可视化最终预测 fig, ax = plt.subplots(figsize=(15, 8)) ax.plot(ts_data.tail(365), label='近期历史数据', color='blue') ax.plot(final_forecast_series, label=f'未来 {forecast_horizon} 天预测', color='red', linestyle='--') ax.fill_between(future_dates, conf_int[:, 0], conf_int[:, 1], color='red', alpha=0.2, label='95% 置信区间') ax.set_title(f'最终未来用量预测 (基于 {best_rolling_model_name})'); ax.legend(); ax.grid(True) save_fig_to_path(fig) # 生成最终报告 final_report_text = f""" # 药品用量预测分析报告 ## 1. 数据概览 - **数据时间范围**: {ts_data.index.min().strftime('%Y-%m-%d')} to {ts_data.index.max().strftime('%Y-%m-%d')} - **总数据点**: {len(ts_data)} - **平均用量**: {ts_data.mean():.2f} ## 2. 分析与建模参数 - **平稳性差分阶数 (d)**: {d_order} - **季节性周期 (m)**: {m_period} - **最佳训练窗口**: {best_window_size} 天 ## 3. 模型评估 (基于动态滚动预测) 通过在历史数据上进行滚动预测,我们能更真实地评估模型在实际应用中的表现。 {rolling_evaluation_df.to_markdown()} ## 4. 最终结论与未来预测 - **最佳模型**: **{best_rolling_model_name}** 被选为最终预测模型,因为它在滚动预测中表现最佳(MAE最低)。 - **未来预测**: 已使用 `{best_rolling_model_name}` 模型对未来 **{forecast_horizon}** 天的用量进行预测。 - **预测摘要**: - 未来一周平均日用量: **{final_forecast_series.head(7).mean():.2f}** - 未来一月平均日用量: **{final_forecast_series.head(30).mean():.2f}** """.strip() report_file_path = os.path.join(OUTPUT_DIR, 'final_analysis_report.txt') with open(report_file_path, 'w', encoding='utf-8') as f: f.write(final_report_text) log_lines.append("\n## 🎉 全部分析流程完成!请查看最终报告和图表。") yield update_ui() except Exception as e: log_lines.append(f"\n\n❌ **分析过程中断,出现错误:**\n`{str(e)}`") log_lines.append(f"\n**Traceback:**\n```{traceback.format_exc()}```") yield update_ui() # ======================== (D) Gradio 界面构建 ======================== with gr.Blocks(theme=gr.themes.Soft(), css="footer {display: none !important}") as demo: gr.Markdown( """ # 📈 智能时序预测与分析平台 (动态回测版) 欢迎使用!请确保您的数据文件 `gmqrkl.xlsx` 已上传至本 Space 的文件库中。 然后,点击下方按钮,启动包含 **窗口优化** 和 **动态滚动预测** 的完整分析流程。 **注意:完整流程计算量大,可能需要10-20分钟。请耐心等待,并观察下方日志区的实时进度。** """ ) start_button = gr.Button("🚀 点击这里,开始完整分析", variant="primary") gr.Markdown("---") with gr.Tabs(): with gr.TabItem("📊 可视化图表", id=0): gallery_output = gr.Gallery(label="分析图表", elem_id="gallery", columns=[1], height="auto", object_fit="contain") with gr.TabItem("📝 实时分析日志", id=1): log_output = gr.Markdown("点击按钮后,分析日志将实时显示在这里...") with gr.TabItem("📋 最终报告与下载", id=2): final_report_output = gr.Markdown("分析完成后,最终报告将显示在这里。") download_output = gr.File(label="下载报告文件") start_button.click( fn=run_full_analysis, inputs=None, outputs=[log_output, gallery_output, final_report_output, download_output] ) gr.Markdown("
Powered by Gradio and Hugging Face Spaces.
") if __name__ == "__main__": demo.launch()