from flask import Flask, request, jsonify
import requests
import json
import time
import pytz
import logging
import threading
import re
from datetime import datetime, timezone
# Flask setup
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# External APIs
MESSAGE_API_URL = "https://aoamrnuwara.pythonanywhere.com/api/send-message"
MESSAGE_API_KEY = "Seakp0683asppoit"
AI_API_URL = "https://corvo-ai-xx-gpt-5.hf.space/chat"
ALERT_API_URL = "https://dooratre-alert.hf.space/monitor"
TS_POINTS_API = "https://dooratre-tracker.hf.space/ts_points"
# New Chart Pro API
CHART_API_BASE = "https://corvo-ai-chart-pro.hf.space"
# Track API for scenarios
SCENARIO_TRACK_API = "https://dooratre-tracker.hf.space/track"
# Retries
MAX_RETRIES = 5
RETRY_DELAY = 30
# DB modules
import db_system
import db_signals
import db_analysis
import get_price
# ======= Globals (place near other globals) =======
analysis_cancel_flags = {} # key: session_id, value: True/False
# Indicator name to TradingView PUB ID map (extend this as needed)
INDICATOR_MAP = {
"FIBO": "STD;Auto%251Fib%251Retracement%251", # fixed
"ADX": "PUB;932",
"RSI": "STD;Divergence%251Indicator", # fixed
"VWAP": "STD;VWAP",
"EMA": "PUB;WzGi7PQBB1HQofcRJ0mq6vxEpIlsHWvw",
"BRCH": "PUB;8c2d234156044effa75d531d82b247b3",
# Add more mappings here...
}
def get_session_id_from_request(req):
try:
data = req.get_json(silent=True) or {}
except Exception:
data = {}
sid = (
data.get("session_id")
or req.args.get("session_id")
or req.headers.get("X-Session-Id")
or "default"
)
return str(sid)
def mark_analysis_cancelled(session_id):
analysis_cancel_flags[session_id] = True
def is_analysis_cancelled(session_id):
return analysis_cancel_flags.get(session_id, False)
def clear_analysis_cancel(session_id):
if session_id in analysis_cancel_flags:
del analysis_cancel_flags[session_id]
def send_message_to_api(message, max_retries=5, retry_delay=10):
headers = {"Content-Type": "application/json", "X-API-Key": MESSAGE_API_KEY}
payload = {"message": message}
for attempt in range(1, max_retries + 1):
try:
response = requests.post(MESSAGE_API_URL, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
logger.info(f"Message sent on attempt {attempt}")
return {"success": True, "response": response.json()}
else:
logger.warning(f"Attempt {attempt}: status {response.status_code}")
except requests.exceptions.RequestException as e:
logger.warning(f"Attempt {attempt}: request error: {e}")
if attempt < max_retries:
time.sleep(retry_delay)
logger.error("Max retries reached. Failed to send message.")
return {"success": False, "error": "Failed after multiple retries"}
def post_ts_points(tp_value, sl_value):
try:
payload = {"TP": str(tp_value), "SL": str(sl_value)}
r = requests.post(TS_POINTS_API, json=payload, timeout=20)
if r.status_code == 200:
return True, None
return False, f"status {r.status_code}"
except Exception as e:
return False, str(e)
def get_time_zones():
zones = {
"Greenwich": "UTC",
"London": "Europe/London",
"New York": "America/New_York",
"Tokyo": "Asia/Tokyo",
"Sydney": "Australia/Sydney"
}
times = {}
for city, zone in zones.items():
tz = pytz.timezone(zone)
current_time = datetime.now(tz)
times[city] = current_time.strftime("%Y-%m-%d %H:%M:%S %Z")
return times
def build_signal_timestamps():
tz_times = get_time_zones()
iso_utc = datetime.now(timezone.utc).isoformat()
return {"zones": tz_times, "iso_utc": iso_utc}
def get_live_prices_for_pairs():
pairs = ["XAUUSD"]
prices = {}
for p in pairs:
try:
data = get_price.get_live_rates_for_pair(p)
if data:
prices[p] = {
"bid": data.get("bid", "N/A"),
"ask": data.get("ask", "N/A"),
"difference": data.get("difference", "N/A")
}
else:
prices[p] = {"bid": "N/A", "ask": "N/A", "difference": "N/A"}
except Exception as e:
logger.warning(f"Price fetch failed for {p}: {e}")
prices[p] = {"bid": "N/A", "ask": "N/A", "difference": "N/A"}
return prices
def format_live_prices_text(prices_dict):
# Short, one-line per pair for attaching after image/user nudges
lines = []
for pair, obj in prices_dict.items():
bid = obj.get("bid", "N/A")
ask = obj.get("ask", "N/A")
diff = obj.get("difference", "N/A")
lines.append(f"{pair}: bid {bid}, ask {ask}, Δ {diff}")
return "\n".join(lines)
def to_float_safe(val):
try:
return float(str(val).strip())
except Exception:
return None
def parse_alert_block(alert_xml: str):
dur_match = re.search(r'(.*?)', alert_xml, re.DOTALL)
if not dur_match:
raise ValueError("duration_min missing in ")
try:
duration_minutes = int(str(dur_match.group(1)).strip())
if duration_minutes <= 0:
raise ValueError
except Exception:
raise ValueError("duration_min must be positive integer")
price_blocks = re.findall(r'(.*?)', alert_xml, re.DOTALL)
if not price_blocks:
raise ValueError("At least one ... block is required")
price_messages = []
for block in price_blocks:
msg_match = re.search(r'(.*?)', block, re.DOTALL)
message = ""
if msg_match:
message = msg_match.group(1).strip()
price_text = re.sub(r'.*?', '', block, flags=re.DOTALL).strip()
price_val = to_float_safe(price_text)
if price_val is None:
raise ValueError(f"Invalid price value in block: '{price_text}'")
price_messages.append({"price": price_val, "message": message})
return {"duration": duration_minutes, "price_messages": price_messages}
def build_monitor_payload_from_alert(parsed_alert: dict, symbol="XAUUSD"):
payload = {
"symbol": symbol,
"duration_minutes": parsed_alert["duration"],
"price_messages": parsed_alert["price_messages"]
}
return payload
def pair_number(val):
cleaned = re.sub(r'[^0-9\.\-]', '', val)
if cleaned in ('', '-', '.'):
return val.strip()
try:
num = float(cleaned)
s = f"{num:.2f}"
return s.rstrip('0').rstrip('.') if '.' in s else s
except:
return val.strip()
def count_xml_tags(ai_response):
counts = {
'scenario': len(re.findall(r'', ai_response)),
'user_messages': len(re.findall(r'', ai_response)),
'alerts': len(re.findall(r'', ai_response)),
'edits': len(re.findall(r'', ai_response)),
'final': len(re.findall(r'', ai_response))
}
return counts
def save_latest_final_analysis(final_text):
"""
Save ONLY the latest final analysis to db_analysis as an array: [ { ... } ]
"""
try:
record = {
"timestamp_utc": datetime.now(timezone.utc).isoformat(),
"response": final_text
}
# Wrap as array for saving
payload_list = [record]
payload_text = json.dumps(payload_list, ensure_ascii=False)
auth_token, commit_oid = db_analysis.fetch_authenticity_token_and_commit_oid()
if auth_token and commit_oid:
result = db_analysis.update_user_json_file(auth_token, commit_oid, payload_text)
return result.get("success", False)
logger.error("Failed to fetch auth or commit OID for final analysis saving.")
return False
except Exception as e:
logger.error(f"Error saving final analysis: {e}")
return False
def get_chart_screenshot(symbol="XAUUSD", exchange="OANDA", interval="15m", indicators=None, width=1920, height=1080, full_page=False):
if indicators is None:
indicators = [INDICATOR_MAP["EMA"]] # default
payload = {
"symbol": symbol,
"exchange": exchange,
"interval": interval,
"indicators": indicators,
"theme": "dark",
"width": "3840",
"height": "2160",
"fullPage": full_page
}
url = f"{CHART_API_BASE}/api/screenshot"
resp = requests.post(url, json=payload, timeout=90)
resp.raise_for_status()
return resp.json()
def load_system_prompt_from_files(has_active_signal: bool, has_scenario: bool):
"""
Returns system prompt string based on current state:
- If has_active_signal: use prompt_signal.txt
- Else (no active signal): use prompt_scenario.txt
If files are missing, fall back to a minimal default in Arabic.
"""
prompt_file = "prompt_signal.txt" if has_active_signal else "prompt_scenario.txt"
try:
with open(prompt_file, "r", encoding="utf-8") as f:
text = f.read().strip()
if text:
return text
except Exception as e:
logger.warning(f"Failed to load system prompt from {prompt_file}: {e}")
# Fallbacks
if has_active_signal:
return "وضع متابعة الصفقة: لا تنشئ سيناريو جديد. حلّل الصفقة الحالية فقط ويمكنك استخدام و و."
else:
return "وضع بناء السيناريو: حلّل وأنشئ سيناريو داخل يتضمن مع Buy/Sell و(@/SL/TP)."
def fetch_signals_raw():
"""
Returns:
{
"has_active_signal": bool,
"active_signal": list or None, # when active, it's a list with 1 object (normalized)
"has_scenario": bool,
"scenario": dict or None,
"raw": original
}
Accepts both legacy object and new array shapes, but normalizes in-memory to arrays when needed.
"""
out = {
"has_active_signal": False,
"active_signal": None,
"has_scenario": False,
"scenario": None,
"raw": None
}
try:
res = db_signals.fetch_json_from_github()
if res["success"] and res["data"]:
raw = res["data"][0]
out["raw"] = raw
# If array and first element has pair/type => active signal
if isinstance(raw, list) and raw and isinstance(raw[0], dict) and "pair" in raw[0] and "type" in raw[0]:
out["has_active_signal"] = True
out["active_signal"] = raw
# If object with "scenario" => scenario mode
elif isinstance(raw, dict) and "scenario" in raw:
out["has_scenario"] = True
out["scenario"] = raw["scenario"]
# Legacy: single signal object (not array) => treat as active signal
elif isinstance(raw, dict) and "pair" in raw and "type" in raw:
out["has_active_signal"] = True
out["active_signal"] = [raw]
except Exception as e:
logger.error(f"Error fetching signals/scenario: {e}")
return out
def save_scenario_object(scenario_obj):
"""
Save scenario to db_signals as an array: [ { "scenario": {...} } ]
"""
try:
payload_list = [{"scenario": scenario_obj}]
payload_text = json.dumps(payload_list, ensure_ascii=False)
auth_token, commit_oid = db_signals.fetch_authenticity_token_and_commit_oid()
if auth_token and commit_oid:
result = db_signals.update_user_json_file(auth_token, commit_oid, payload_text)
return result.get("success", False)
return False
except Exception as e:
logger.error(f"Error saving scenario: {e}")
return False
def post_scenario_to_tracker(buy_at, sell_at):
try:
payload = {"Buy": buy_at, "Sell": sell_at}
r = requests.post(SCENARIO_TRACK_API, json=payload, timeout=20)
if r.status_code == 200:
return True, None
return False, f"status {r.status_code}"
except Exception as e:
return False, str(e)
def build_initial_chat_history(alert_message=None):
chat_history = []
# Determine current state (active signal vs scenario/no state)
try:
state = fetch_signals_raw()
except Exception as e:
logger.error(f"Error determining state for system prompt: {e}")
state = {"has_active_signal": False, "has_scenario": False}
has_active = state.get("has_active_signal", False)
has_scen = state.get("has_scenario", False)
# Load system prompt from files based on state
try:
system_base_prompt = load_system_prompt_from_files(has_active, has_scen)
except Exception as e:
logger.error(f"Error loading system prompt: {e}")
system_base_prompt = "ابدأ التحليل وفق حالتك (صفقة نشطة أو سيناريو)."
# Fetch news summary from db_system and name it 'news'
news = ""
try:
system_data = db_system.fetch_json_from_github()
if system_data["success"] and system_data["data"]:
news = system_data["data"][0].get("response", "") or ""
except Exception as e:
logger.error(f"Error fetching news from db_system: {e}")
news = ""
# Build system turn (system prompt + time zones + news)
try:
times = get_time_zones()
time_info = "\n".join([f"{city}: {time}" for city, time in times.items()])
parts = [system_base_prompt, f"[Time Zones]\n{time_info}"]
if news.strip():
parts.append(f"[News]\n{news.strip()}")
system_full = "\n\n".join(parts)
chat_history.append({
"role": "system",
"content": system_full
})
except Exception as e:
logger.error(f"Error building system turn: {e}")
chat_history.append({
"role": "system",
"content": system_base_prompt
})
multipart_content = []
# Previous analysis (optional) - Read from db_analysis; supports array and legacy object
try:
analysis_data = db_analysis.fetch_json_from_github()
prev_text = ""
if analysis_data["success"] and analysis_data["data"]:
raw_obj = analysis_data["data"][0]
if isinstance(raw_obj, list) and raw_obj:
raw_text = raw_obj[-1].get("response", "")
elif isinstance(raw_obj, dict):
raw_text = raw_obj.get("response", "")
else:
raw_text = ""
prev_text = str(raw_text)[:1500]
if prev_text:
multipart_content.append({"type": "text", "text": f"LAST ANALYSIS HAPPEN :\n{prev_text}"})
except Exception as e:
logger.error(f"Error fetching previous analysis: {e}")
# Alert + current context (active signal or scenario or none)
try:
times = get_time_zones()
time_info = "\n".join([f"{city}: {time}" for city, time in times.items()])
prices_text = format_live_prices_text(get_live_prices_for_pairs())
message_content = ""
if alert_message:
message_content += f" ALERT MESSAGE: {alert_message}\n\n"
else:
message_content += "NO Any Message from ALERT\n\n"
if has_active:
sig = state["active_signal"][0]
message_content += (
"The user is currently in an active trade (one of the scenarios has been triggered):\n"
f"- Pair: {sig.get('pair','N/A')}\n"
f"- Type: {sig.get('type','N/A')}\n"
f"- Entry: {sig.get('entry','N/A')}\n"
f"- Stop Loss: {sig.get('stop_loss','N/A')}\n"
f"- Take Profit: {sig.get('take_profit','N/A')}\n"
"Important Instructions:\n"
"- Provide only ONE
at a time in the format:\n"
"
// you can use maxumum 3 indecators in same image\nExample
<15m>"
"- Use only if you think the conversation already has enough information to conclude.\n\nYou need put 5 images i this conversation start with first one"
f"Current Time:\n{time_info}\n\n"
f"Live Prices:\n{prices_text}"
)
elif has_scen:
sc = state["scenario"]
buy = sc.get("Buy", {})
sell = sc.get("Sell", {})
message_content += (
"There is a previously saved scenario that hasn’t been triggered yet. Creating a new scenario will replace the old one:\n"
f"- Buy: @={buy.get('at','N/A')}, SL={buy.get('SL','N/A')}, TP={buy.get('TP','N/A')}\n"
f"- Sell: @={sell.get('at','N/A')}, SL={sell.get('SL','N/A')}, TP={sell.get('TP','N/A')}\n\n"
"AI Guidance: Continue analyzing. If you want to update the scenario, send a with a new to replace it. If no new scenario is created, we will wait for one of the scenarios to be triggered.\n\n"
"Important Instructions:\n"
"- Provide only ONE
at a time:\n"
"
// you can use maxumum 3 indecators in same image\nExample
<15m>"
"- Use only if you believe there is enough information in the conversation.\n\nYou need put 5 images i this conversation start with first one"
f"Current Time:\n{time_info}\n\n"
f"Live Prices:\n{prices_text}"
)
else:
message_content += (
"No scenario or active trade exists (first run). Please analyze and create the first scenario within when done.\n\n"
"Important Instructions:\n"
"- Provide only ONE
at a time:\n"
"
// you can use maxumum 3 indecators in same image\nExample
<15m>"
"- make SL from 3$ to 5$ and TP from 7$ to 10$\n\n You need put 5 images i this conversation start with first one"
f"Current Time:\n{time_info}\n\n"
f"Live Prices:\n{prices_text}"
)
multipart_content.append({"type": "text", "text": message_content})
except Exception as e:
logger.error(f"Error building initial user content: {e}")
if multipart_content:
chat_history.append({
"role": "user",
"type": "multipart",
"content": multipart_content
})
else:
chat_history.append({
"role": "user",
"content": "No additional context available."
})
return chat_history
def call_o1_ai_api(formatted_chat_history, timeout=600):
headers = {"Content-Type": "application/json"}
payload = {"chat_history": formatted_chat_history}
for attempt in range(MAX_RETRIES):
try:
response = requests.post(AI_API_URL, headers=headers, data=json.dumps(payload), timeout=timeout)
response.raise_for_status()
assistant_response = response.json().get("assistant_response", "No response received.")
formatted_chat_history.append({"role": "assistant", "content": assistant_response})
return assistant_response, formatted_chat_history
except requests.exceptions.Timeout:
logger.warning(f"AI timeout attempt {attempt+1}, retrying...")
time.sleep(RETRY_DELAY)
except Exception as e:
logger.warning(f"AI error attempt {attempt+1}: {e}, retrying...")
time.sleep(RETRY_DELAY)
return "Error processing request. Please try again.", formatted_chat_history
def parse_img_request(ai_text):
m = re.search(r'
([\s\S]*?)', ai_text, re.IGNORECASE)
if not m:
return None
inner = m.group(1)
tokens = re.findall(r'<\s*([^<>]+?)\s*>', inner)
if not tokens:
return None
symbol = None
interval = None
indicators = []
# helpers
def is_timeframe(tok):
t = tok.strip().lower()
if t in ("d","w"):
return True
return bool(re.fullmatch(r'\d+[mh]', t)) # 1m,5m,15m,1h,4h
def normalize_timeframe(tok):
low = tok.strip().lower()
return low.upper() if low in ("d","w") else low
def is_symbol(tok):
return bool(re.fullmatch(r'[A-Z0-9_]{3,15}', tok.strip()))
# Pass 1: determine symbol as the FIRST token that looks like a symbol
for tok in tokens:
t = tok.strip()
if is_symbol(t):
symbol = t
break
# Default if none provided
if not symbol:
symbol = "XAUUSD"
# Pass 2: pick timeframe (first valid)
for tok in tokens:
t = tok.strip()
if is_timeframe(t):
interval = normalize_timeframe(t)
break
if not interval:
interval = "15m"
# Pass 3: indicators = tokens that are KNOWN in INDICATOR_MAP keys
known_inds = set(INDICATOR_MAP.keys())
for tok in tokens:
t = tok.strip()
# Skip if token is symbol or timeframe
if t == symbol or is_timeframe(t):
continue
# Only accept if token is a known indicator key
if t in known_inds:
indicators.append(t)
else:
logger.warning(f"Unknown token in
: '{t}' (ignored)")
# At least one indicator: if none valid, fallback to EMA
if not indicators:
indicators = ["EMA"]
return {"symbol": symbol, "interval": interval, "indicators": indicators}
def indicators_to_pub_ids(indicator_names):
ids = []
for name in indicator_names:
key = name.strip()
if key in INDICATOR_MAP:
ids.append(INDICATOR_MAP[key])
else:
logger.warning(f"Unknown indicator name '{key}', skipping.")
if not ids:
ids = [INDICATOR_MAP.get("EMA")]
# de-duplicate while preserving order
seen = set()
out = []
for i in ids:
if i and i not in seen:
out.append(i)
seen.add(i)
return out
def build_image_reply_user_turn(png_url):
prices_text = format_live_prices_text(get_live_prices_for_pairs())
# Add current times for all major zones
tz_times = get_time_zones()
time_info = "\n".join([f"{city}: {time}" for city, time in tz_times.items()])
content = [
{"type": "image", "url": png_url},
{"type": "text", "text": (
"📊 Your chart is ready for analysis.\n\n"
"⚠️ First: Analyze the image I just sent you before asking for any new one.\n"
"- If you need more confirmation → request **only ONE
** at a time for the next chart.\n"
" Example:
\n"
"- If you already have enough information → finish with .\n\n"
"🚫 Do NOT request multiple images at once.\n"
"🚫 If you use , don’t request another
after it.\n\n"
"Be smart with your analysis – choose indicators and timeframes like a pro. Now, go ahead with your analysis and tell me what’s image you need? or that enough ?."
)},
{"type": "text", "text": f"⏰ Current Time:\n{time_info}"},
{"type": "text", "text": f"💰 Live Prices:\n{prices_text}"}
]
return {"role": "user", "type": "multipart", "content": content}
def extract_final_block(ai_text):
m = re.search(r'([\s\S]*?)', ai_text)
if not m:
return None
return m.group(0), m.group(1)
def parse_scenario_from_final(final_inner):
# Extract scenario block and return structured dict or None
scen_match = re.search(r'([\s\S]*?)', final_inner)
if not scen_match:
return None
scen_inner = scen_match.group(1)
# Buy
buy_block = re.search(r'([\s\S]*?)', scen_inner)
sell_block = re.search(r'([\s\S]*?)', scen_inner)
def parse_side(block_text):
if not block_text:
return None
at_match = re.search(r'<@>(.*?)@>', block_text, re.DOTALL)
sl_match = re.search(r'(.*?)', block_text, re.DOTALL)
tp_match = re.search(r'(.*?)', block_text, re.DOTALL)
at = at_match.group(1).strip() if at_match else ""
sl = pair_number(sl_match.group(1).strip()) if sl_match else ""
tp = pair_number(tp_match.group(1).strip()) if tp_match else ""
return {"at": at, "SL": sl, "TP": tp}
buy = parse_side(buy_block.group(1) if buy_block else None)
sell = parse_side(sell_block.group(1) if sell_block else None)
if not buy and not sell:
return None
scenario_obj = {
"Buy": buy or {"at": "", "SL": "", "TP": ""},
"Sell": sell or {"at": "", "SL": "", "TP": ""},
"timestamps": build_signal_timestamps()
}
return scenario_obj
def parse_and_execute_final(final_xml_full, final_inner):
actions_performed = []
# Save latest final text
try:
ok = save_latest_final_analysis(final_xml_full)
if ok:
actions_performed.append("✅ تم حفظ التحليل النهائي (آخر واحد فقط)")
else:
actions_performed.append("❌ فشل في حفظ التحليل النهائي")
except Exception as e:
logger.error(f"Error saving final analysis: {e}")
actions_performed.append("❌ خطأ في حفظ التحليل النهائي")
# If there is an active signal in DB, we should not replace it with scenario.
state = fetch_signals_raw()
active_signal_present = state["has_active_signal"]
# Process if present (for active signal only)
if active_signal_present:
edit_matches = re.finditer(r'(.*?)', final_inner, re.DOTALL)
for edit_match in edit_matches:
try:
edit_xml = edit_match.group(1)
edit_data = {}
sl_match = re.search(r'(.*?)', edit_xml) or re.search(r'(.*?)', edit_xml)
if sl_match:
edit_data["stop_loss"] = pair_number(sl_match.group(1).strip())
tp_match = re.search(r'(.*?)', edit_xml)
if tp_match:
edit_data["take_profit"] = pair_number(tp_match.group(1).strip())
result = edit_existing_signal(edit_data) if edit_data else {"success": False, "error": "No changes to apply"}
if result.get("success"):
# Build a detailed message reflecting exactly what changed
parts = []
if "take_profit" in edit_data:
parts.append(f"💰 تم تغيير الهدف إلى: {edit_data['take_profit']} 💰")
if "stop_loss" in edit_data:
parts.append(f"🛑 تم تغيير وقف الخسارة إلى: {edit_data['stop_loss']} 🛑")
change_text = "\n".join(parts) if parts else "تم التنفيذ دون تغييرات واضحة."
send_message_to_api(f"🔄 تم تحديث الصفقة المفتوحة (SL/TP).\n{change_text}")
actions_performed.append("✅ تم تحديث الصفقة المفتوحة")
else:
actions_performed.append(f"⚠️ لم يتم تحديث الصفقة: {result.get('error')}")
except Exception as e:
logger.error(f"Error processing Edit: {e}")
actions_performed.append(f"❌ خطأ في معالجة Edit: {str(e)}")
# Process only if no active signal
if not active_signal_present:
scenario_obj = parse_scenario_from_final(final_inner)
if scenario_obj:
# Save scenario
try:
ok = save_scenario_object(scenario_obj)
if ok:
actions_performed.append("✅ تم حفظ السيناريو (استبدال أي سيناريو سابق)")
else:
actions_performed.append("❌ فشل حفظ السيناريو")
except Exception as e:
logger.error(f"Error saving scenario: {e}")
actions_performed.append(f"❌ خطأ حفظ السيناريو: {str(e)}")
# Post to tracker
buy_at = (scenario_obj.get("Buy") or {}).get("at", "")
sell_at = (scenario_obj.get("Sell") or {}).get("at", "")
ok, err = post_scenario_to_tracker(buy_at, sell_at)
if ok:
actions_performed.append("✅ تم إشعار نظام التتبع بالسيناريو")
else:
actions_performed.append(f"⚠️ فشل إشعار نظام التتبع: {err}")
else:
actions_performed.append("ℹ️ لا يوجد في التحليل النهائي أو غير صالح")
# Process
user_msg_matches = re.finditer(r'(.*?)', final_inner, re.DOTALL)
for user_msg_match in user_msg_matches:
try:
user_message = user_msg_match.group(1).strip()
if user_message:
send_result = send_message_to_api(user_message)
if send_result["success"]:
actions_performed.append("✅ تم إرسال رسالة للمستخدم")
else:
actions_performed.append("❌ فشل في إرسال رسالة للمستخدم")
else:
actions_performed.append("⚠️ رسالة فارغة تم تجاهلها")
except Exception as e:
logger.error(f"Error sending user message: {e}")
actions_performed.append(f"❌ خطأ في إرسال الرسالة: {str(e)}")
# Process
alert_matches = re.finditer(r'(.*?)', final_inner, re.DOTALL)
for alert_match in alert_matches:
try:
alert_xml = alert_match.group(1)
try:
parsed = parse_alert_block(alert_xml)
except ValueError as ve:
actions_performed.append(f"❌ Alert parse error: {str(ve)}")
continue
alert_payload = build_monitor_payload_from_alert(parsed, symbol="XAUUSD")
try:
response = requests.post(ALERT_API_URL, json=alert_payload, timeout=30)
if response.status_code == 200:
alert_message = (
"⏰ تم تعيين منبه جديد 🔔\n\n"
f"الرمز: {alert_payload.get('symbol', 'XAUUSD')}\n"
f"⏱️ المدة: {alert_payload['duration_minutes']} دقيقة\n"
"📊 سيتم إرسال تنبيه عند أول مستوى يتم عبوره."
)
send_result = send_message_to_api(alert_message)
if send_result["success"]:
actions_performed.append("✅ تم إنشاء منبه جديد وإرسال الإشعار")
else:
actions_performed.append("⚠️ تم إنشاء المنبه لكن فشل إرسال الإشعار")
else:
actions_performed.append(f"❌ فشل في إنشاء المنبه (كود: {response.status_code})")
except Exception as req_e:
actions_performed.append(f"❌ خطأ اتصال أثناء إنشاء المنبه: {str(req_e)}")
except Exception as e:
logger.error(f"Error creating alert: {e}")
actions_performed.append(f"❌ خطأ في إنشاء المنبه: {str(e)}")
return actions_performed
def edit_existing_signal(edit_data):
"""
Edit the active signal (assumed stored as an array with a single signal object) and save back as an array.
"""
try:
signals_data = db_signals.fetch_json_from_github()
if not (signals_data["success"] and signals_data["data"]):
return {"success": False, "error": "No active signal found to edit"}
raw = signals_data["data"][0]
# Expecting current storage shape to be an array; ensure we handle both array and object safely
if isinstance(raw, list) and raw and isinstance(raw[0], dict):
current_signal = raw[0]
container_is_list = True
elif isinstance(raw, dict):
# Legacy/object format, normalize to a single-element list
current_signal = raw
container_is_list = False
logger.warning("Signals DB returned an object; normalizing to array on save.")
else:
return {"success": False, "error": "No active signal found to edit"}
updates_made = []
if "stop_loss" in edit_data:
old_sl = current_signal.get("stop_loss", "N/A")
current_signal["stop_loss"] = edit_data["stop_loss"]
updates_made.append(f"stop_loss: {old_sl} → {edit_data['stop_loss']}")
if "take_profit" in edit_data:
old_tp = current_signal.get("take_profit", "N/A")
current_signal["take_profit"] = edit_data["take_profit"]
updates_made.append(f"take_profit: {old_tp} → {edit_data['take_profit']}")
if not updates_made:
return {"success": False, "error": "No changes to apply"}
auth_token, commit_oid = db_signals.fetch_authenticity_token_and_commit_oid()
if auth_token and commit_oid:
# Always save as array
updated_signal_list = [current_signal]
updated_json = json.dumps(updated_signal_list, ensure_ascii=False)
result = db_signals.update_user_json_file(auth_token, commit_oid, updated_json)
if result.get("success"):
return {"success": True, "updates": updates_made}
else:
return {"success": False, "error": "Failed to update signal"}
else:
return {"success": False, "error": "Failed to get auth tokens"}
except Exception as e:
logger.error(f"Error editing signal: {e}")
return {"success": False, "error": str(e)}
def get_current_active_tp_sl():
try:
signals_data = db_signals.fetch_json_from_github()
if signals_data["success"] and signals_data["data"]:
raw = signals_data["data"][0]
if isinstance(raw, list) and raw and isinstance(raw[0], dict):
tp = str(raw[0].get("take_profit", "")).strip()
sl = str(raw[0].get("stop_loss", "")).strip()
return {"TP": tp, "SL": sl, "found": True}
except Exception as e:
logger.error(f"Error fetching current TP/SL: {e}")
return {"TP": "", "SL": "", "found": False}
def run_multi_turn_analysis(chat_history, max_steps=10, session_id="default"):
"""
Multi-turn loop with cancellation support:
- Checks is_analysis_cancelled(session_id) at each step
- If cancelled, clears flag and returns immediately
- Otherwise continues as before
"""
steps = 0
last_ai_response = ""
while steps < max_steps:
# Cancellation check before each step
if is_analysis_cancelled(session_id):
clear_analysis_cancel(session_id)
return {
"success": True,
"message": "Analysis cancelled by stop_analysis",
"steps": steps,
"actions_performed": [],
"ai_response_preview": last_ai_response[:300]
}
steps += 1
ai_response, chat_history = call_o1_ai_api(chat_history)
last_ai_response = ai_response or ""
# If AI returned , break and process
if re.search(r'', last_ai_response):
final_full, final_inner = extract_final_block(last_ai_response)
if not final_full:
return {
"success": True,
"message": "AI final detected but malformed.",
"steps": steps,
"actions_performed": ["❌ final block malformed"],
"ai_response_preview": last_ai_response[:300]
}
# NEW: Print full chat history as roles before executing actions
try:
logger.info("=== Full Chat History (before final actions) ===")
for turn in chat_history:
role = turn.get("role", "unknown")
if turn.get("type") == "multipart":
logger.info(f"ROLE: {role}")
parts = turn.get("content", [])
for p in parts:
if p.get("type") == "text":
txt = str(p.get("text", ""))[:1000]
logger.info(f" [text] {txt}")
elif p.get("type") == "image":
logger.info(f" [image] {p.get('url','')}")
else:
logger.info(f" [part] {p}")
else:
content = turn.get("content", "")
content_preview = str(content)[:1000]
logger.info(f"ROLE: {role}\n{content_preview}")
logger.info("=== End Chat History ===")
except Exception as e:
logger.warning(f"Failed to print chat history: {e}")
actions = parse_and_execute_final(final_full, final_inner)
return {
"success": True,
"message": "Final actions executed",
"steps": steps,
"actions_performed": actions,
"ai_response_preview": last_ai_response[:300]
}
# Else check for
request
img_req = parse_img_request(last_ai_response)
if img_req:
try:
symbol = img_req["symbol"]
interval = img_req["interval"]
indicator_names = img_req["indicators"]
indicator_ids = indicators_to_pub_ids(indicator_names)
data = get_chart_screenshot(
symbol=symbol,
exchange="OANDA",
interval=interval,
indicators=indicator_ids,
width=1080,
height=1920,
full_page=False
)
png_url = data.get("imageUrl") or data.get("imageURL") or data.get("png") or "" or data.get("image_url", "")
if png_url:
user_turn = build_image_reply_user_turn(png_url)
chat_history.append(user_turn)
else:
prices_text = format_live_prices_text(get_live_prices_for_pairs())
chat_history.append({
"role": "user",
"content": f"تعذر الحصول على صورة من الخادم. اطلب صورة أخرى أو أرسل .\nالأسعار الحية الآن:\n{prices_text}"
})
except Exception as e:
logger.error(f"Chart fetch error: {e}")
prices_text = format_live_prices_text(get_live_prices_for_pairs())
chat_history.append({
"role": "user",
"content": f"حدث خطأ أثناء جلب الصورة. اطلب صورة أخرى أو أرسل .\nالأسعار الحية الآن:\n{prices_text}"
})
continue
# If neither nor
was used, nudge AI and include price snapshot
prices_text = format_live_prices_text(get_live_prices_for_pairs())
chat_history.append({
"role": "user",
"content": f"يرجى طلب صورة باستخدام
أو إنهاء التحليل بإرسال .\nالأسعار الحية الآن:\n{prices_text}"
})
# Max steps reached without final
return {
"success": True,
"message": "Maximum steps reached without final.",
"steps": steps,
"actions_performed": [],
"ai_response_preview": last_ai_response[:300]
}
@app.route('/stop_analysis', methods=['POST'])
def stop_analysis():
try:
session_id = get_session_id_from_request(request)
mark_analysis_cancelled(session_id)
return jsonify({
"success": True,
"message": f"Stop signal received. Session '{session_id}' will be cancelled at the next safe point."
})
except Exception as e:
logger.error(f"Error in stop_analysis: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/analysis_now', methods=['POST'])
def analysis_now():
try:
data = request.get_json()
alert_message = data.get('message', '') if data else ''
logger.info(f"Received alert message: {alert_message}")
# Run background processing in separate thread
def background_task(alert_message):
try:
chat_history = build_initial_chat_history(alert_message)
result = run_multi_turn_analysis(chat_history)
tags = count_xml_tags(result.get("ai_response_preview", ""))
logger.info(f"Background analysis completed. Tags: {tags}")
except Exception as e:
logger.error(f"Error in background task: {e}")
threading.Thread(target=background_task, args=(alert_message,)).start()
# Immediately return 200 OK with no data
return Response(status=200)
except Exception as e:
logger.error(f"Error in analysis_now: {e}")
return Response(status=200) # still return 200 so client knows it arrived
@app.route('/start_analysis', methods=['GET'])
def start_analysis():
try:
logger.info("Starting initial analysis (multi-turn)...")
chat_history = build_initial_chat_history()
result = run_multi_turn_analysis(chat_history)
tags = count_xml_tags(result.get("ai_response_preview", ""))
return jsonify({
"success": True,
"message": "Initial analysis completed",
"xml_tags_found": tags,
"actions_performed": result.get("actions_performed", []),
"total_actions": len(result.get("actions_performed", [])),
"steps": result.get("steps", 0),
"ai_response_preview": result.get("ai_response_preview", "")
})
except Exception as e:
logger.error(f"Error in start_analysis: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/test_actions', methods=['POST'])
def test_actions():
"""
Test endpoint for providing a block directly.
It will:
- Save only the latest final
- Execute actions: scenario (if no active trade), Edit (if active trade), send_user, Alert
"""
try:
data = request.get_json()
test_response = data.get('test_response', '') if data else ''
if not test_response:
return jsonify({"success": False, "error": "Please provide test_response in the request body"}), 400
final_tuple = extract_final_block(test_response)
if not final_tuple:
return jsonify({
"success": False,
"error": "No block found in test_response"
}), 400
final_full, final_inner = final_tuple
actions = parse_and_execute_final(final_full, final_inner)
tags = count_xml_tags(final_full)
return jsonify({
"success": True,
"message": "Test final processed",
"xml_tags_found": tags,
"actions_performed": actions,
"total_actions": len(actions),
"test_response_preview": final_full[:200] + "..." if len(final_full) > 200 else final_full
})
except Exception as e:
logger.error(f"Error in test_actions: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/health', methods=['GET'])
def health_check():
return jsonify({
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"system": "XAUUSD Trading AI (multi-turn, chart-pro, scenario-mode)",
"execution_order": [
"1. Multi-turn image requests until ",
"2. Save only the single latest ",
"3. If active trade exists: allow Edit/send_user/Alert only (no scenario creation).",
"4. If no active trade: process , save, notify tracker.",
"5. No creation by AI (signals will be created by external tracker when hit)."
]
})
@app.route('/', methods=['GET'])
def index():
return jsonify({
"message": "نظام الذكاء الاصطناعي لإشارات تداول XAUUSD (وضع السيناريو متعدد المراحل)",
"endpoints": {
"/start_analysis": "بدء التحليل متعدد المراحل (GET)",
"/analysis_now": "webhook للتحليل متعدد المراحل (POST: {message})",
"/test_actions": "اختبار معالجة (POST: {test_response})",
"/health": "فحص حالة النظام (GET)"
},
"version": "4.0.0",
"notes": [
"استبدال بـ في مخرجات الذكاء الاصطناعي",
"يتم حفظ السيناريو الأخير فقط في db_signals",
"يتم إشعار نظام التتبع بالـ @ لكل من Buy/Sell",
"في حال وجود صفقة نشطة، لا يتم إنشاء سيناريو جديد بل متابعة الصفقة فقط",
"بعد كل صورة نضيف الأسعار الحية لتتبع حركة السعر خلال المحادثة"
]
})
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=7860)