# =============================================== # ADAPTIVE META-CONTROLLER MAIN LOOP (V1) # Drop-in for app.py — replaces your main_worker. # =============================================== import os import csv import time import math import random from collections import deque, defaultdict import pandas as pd # ------------------ Meta Components ------------------ class PerformanceLogger: \"\"\"Append signals and outcomes to a CSV for meta-learning and replay.\"\"\" def __init__(self, path=\"/mnt/data/agent_signals_log.csv\"): self.path = path header = [\"timestamp\",\"strategy\",\"action\",\"entry\",\"stop_loss\",\"take_profit\",\"price_at_signal\",\"eval_time\",\"pnl\",\"reward\",\"context_hash\"] if not os.path.exists(self.path): with open(self.path, \"w\", newline='') as f: writer = csv.writer(f) writer.writerow(header) def log_signal(self, ts, strategy, action, entry, sl, tp, price, eval_time, context_hash): with open(self.path, \"a\", newline='') as f: writer = csv.writer(f) writer.writerow([ts, strategy, action, entry, sl, tp, price, eval_time, \"\", \"\", context_hash]) def update_outcome(self, ts, pnl, reward): rows = [] filled = False with open(self.path, \"r\", newline='') as f: rows = list(csv.reader(f)) for i in range(len(rows)-1, 0, -1): if rows[i][0] == ts and rows[i][8] == \"\": rows[i][8] = f\"{pnl:.6f}\" rows[i][9] = f\"{reward:.6f}\" filled = True break if filled: with open(self.path, \"w\", newline='') as f: writer = csv.writer(f) writer.writerows(rows) class PageHinkley: \"\"\"Page-Hinkley change detector for streaming losses/returns.\"\"\" def __init__(self, delta=0.0001, lambda_=40, alpha=1-1e-3): self.mean = 0.0 self.delta = delta self.lambda_ = lambda_ self.alpha = alpha self.cumulative = 0.0 def update(self, x): # x: score (e.g., negative pnl or error) self.mean = self.mean * self.alpha + x * (1 - self.alpha) self.cumulative = min(self.cumulative + x - self.mean - self.delta, 0) if -self.cumulative > self.lambda_: self.cumulative = 0 return True return False class ThompsonBandit: \"\"\"Thompson sampling bandit with Bernoulli reward (win/loss).\"\"\" def __init__(self, strategies): self.strategies = list(strategies) self.success = {s: 1 for s in self.strategies} # Beta(1,1) priors self.fail = {s: 1 for s in self.strategies} def select(self, context=None): samples = {s: random.betavariate(self.success[s], self.fail[s]) for s in self.strategies} return max(samples, key=samples.get) def update(self, strategy, reward_binary): if reward_binary >= 1: self.success[strategy] += 1 else: self.fail[strategy] += 1 class StrategyManager: \"\"\"Wrap strategies with a uniform callable interface.\"\"\" def __init__(self, situation_room, extra_strategies=None): self.situation_room = situation_room self.extra = extra_strategies or {} def list_strategies(self): # Provide your canonical rule-based strategy # The exact signature for generate_thesis may differ in your code. def rule_based(seq): # You may customize how horizons/params are passed from your BEST_PARAMS return self.situation_room.generate_thesis({}, seq) all_strat = {\"rule_based\": rule_based} all_strat.update(self.extra) return all_strat # ------------------ Small helpers ------------------ def context_hash_from_df(df): r = df.iloc[-1] keys = [k for k in [\"close\",\"ATR\",\"EMA_20\",\"RSI\",\"session_london\"] if k in r.index] vals = [f\"{r[k]:.6f}\" for k in keys] return \"_\".join(vals) if vals else f\"{float(r.get('close', 0.0)):.6f}\" def fetch_current_price_or_last(seq): try: return float(seq.iloc[-1]['close']) except Exception: return float(seq['close'].iloc[-1]) # ------------------ Evaluation pass ------------------ def evaluate_pending_signals(perf_logger_path, bandit, change_detector, price_fetch_seq): now = pd.Timestamp.now(tz='UTC') rows = [] updated = False try: with open(perf_logger_path, \"r\", newline='') as f: rows = list(csv.reader(f)) except FileNotFoundError: return for i in range(1, len(rows)): if rows[i][8] != \"\": # already evaluated continue eval_time_str = rows[i][7] try: eval_time = pd.to_datetime(eval_time_str) except Exception: continue if eval_time <= now: strategy = rows[i][1]; action = rows[i][2] try: entry = float(rows[i][3]) except Exception: continue price_now = fetch_current_price_or_last(price_fetch_seq()) pnl = (price_now - entry) if action == \"BUY\" else (entry - price_now) reward = 1.0 if pnl > 0 else 0.0 rows[i][8] = f\"{pnl:.6f}\" rows[i][9] = f\"{reward:.6f}\" bandit.update(strategy, reward) _ = change_detector.update(-pnl) updated = True if updated: with open(perf_logger_path, \"w\", newline='') as f: writer = csv.writer(f) writer.writerows(rows) # ------------------ Bootstrap dependencies ------------------ def bootstrap_components(symbol): \"\"\"Create or load your core app components. If your app constructs these elsewhere, replace this with imports/uses of your instances. \"\"\" # Prediction engine: assumes a class PredictionEngine() exists in your app try: pred_engine = PredictionEngine(symbol=symbol) except Exception: pred_engine = None # If you don't have it or construct elsewhere # Situation room & regime filter try: sr = RuleBasedSituationRoom(BEST_PARAMS) except Exception: sr = RuleBasedSituationRoom({}) try: rf = MarketRegimeFilter() except Exception: class _DummyRF: def should_trade(self, regime, thesis): return True rf = _DummyRF() return pred_engine, sr, rf # ------------------ NEW main_worker ------------------ def main_worker(symbol: str, ntfy_topic: str, poll_interval_seconds: int = 60, lookback_minutes: int = 240, eval_horizon_minutes: int = 30): \"\"\"Adaptive, self-evaluating main loop. Replaces your existing main_worker. Safe to run in paper mode. \"\"\" pred_engine, situation_room, regime_filter = bootstrap_components(symbol) strategy_manager = StrategyManager(situation_room, extra_strategies={ # Example alt strategy: a tiny scalp variant built on top of your situation room. \"scalp\": lambda seq: situation_room.generate_thesis({}, seq) }) bandit = ThompsonBandit(strategy_manager.list_strategies().keys()) perf_logger = PerformanceLogger() change_detector = PageHinkley(delta=0.0001, lambda_=40) def _price_seq_provider(): # Replace with your data fetcher to get the latest window return fetch_latest_sequence(symbol, lookback_minutes) print(\"[Adaptive] main_worker started.\") while True: try: # 1) Fetch latest window + build features input_sequence = _price_seq_provider() if input_sequence is None or len(input_sequence) < 10: time.sleep(poll_interval_seconds); continue features = create_feature_set_for_inference(input_sequence) # 2) Predict (optional): if you have a prediction_engine, use it to enrich features if pred_engine is not None and hasattr(pred_engine, \"predict\"): try: _ = pred_engine.predict(features) except Exception as _e: pass # 3) Regime classification (optional): if you have a function, call it; else set default current_regime = \"normal\" # 4) Strategy selection and signal available = strategy_manager.list_strategies() chosen_name = bandit.select(context=None) trade_thesis = available[chosen_name](features) is_tradeable = True try: is_tradeable = regime_filter.should_trade(current_regime, trade_thesis) except Exception: pass final_action = trade_thesis.get('action', 'NO ACTION') if not is_tradeable: final_action = \"NO TRADE (FILTERED)\" # 5) Log signal for later evaluation ts = str(pd.Timestamp.now(tz='UTC')) context_hash = context_hash_from_df(features) if final_action in [\"BUY\", \"SELL\"]: perf_logger.log_signal( ts, chosen_name, final_action, trade_thesis.get('entry', features.iloc[-1]['close']), trade_thesis.get('stop_loss', None), trade_thesis.get('take_profit', None), float(features.iloc[-1]['close']), (pd.Timestamp.now(tz='UTC') + pd.Timedelta(minutes=eval_horizon_minutes)).isoformat(), context_hash ) # Notify try: send_ntfy_notification(ntfy_topic, trade_thesis | {\"strategy\": chosen_name}) except Exception: pass # 6) Evaluate pending signals (shadow P&L) evaluate_pending_signals(perf_logger.path, bandit, change_detector, _price_seq_provider) # 7) Optional: trigger fine-tune on drift # You can check the internal state of change_detector if you adapt the class to expose flags. time.sleep(poll_interval_seconds) except KeyboardInterrupt: print(\"[Adaptive] Stopping main_worker.\") break except Exception as e: # Keep the loop resilient print(f\"[Adaptive] Loop error: {e}\") time.sleep(poll_interval_seconds)