XAUUSD-PRO / app.py
Dooratre's picture
Update app.py
fa6bb84 verified
raw
history blame
20 kB
import json
import time
import requests
from flask import Flask, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
from fetch_news import get_recent_news_items, build_news_summary
from get_price import get_live_rates_for_pair
from ai_api import call_o1_ai_api
from db_news import (
fetch_json_from_github as fetch_news_json_from_github,
fetch_authenticity_token_and_commit_oid as fetch_news_auth_token_and_oid,
update_user_json_file as update_news_json_file
)
# NEW: import twitter db helpers
from db_twiter import (
fetch_json_from_github as fetch_twitter_json_from_github,
fetch_authenticity_token_and_commit_oid as fetch_twiter_auth_token_and_oid,
update_user_json_file as update_twiter_json_file
)
# Create Flask app
app = Flask(__name__)
# Global variable to store the last update result
last_update_result = {
"success": False,
"timestamp": None,
"message": "No updates yet"
}
# Simple logger helper
def log(step, msg):
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] [{step}] {msg}", flush=True)
# =========================
# Screenshot helper
# =========================
SCREENSHOT_API = "https://corvo-ai-xx-sc.hf.space/capture"
def capture_screenshots(urls, width=1080, height=1920, full_page=True, timeout=400):
"""
Calls the screenshot API with one or more URLs and returns a dict:
{
'success': True/False,
'images': [ { 'srcUrl': <original>, 'imageUrl': <png> }, ... ],
'errors': [ ...raw errors... ]
}
"""
step = "SCREENSHOT"
payload = {
"urls": urls,
"width": width,
"height": height,
"fullPage": full_page
}
try:
log(step, f"Requesting screenshots for {len(urls)} URL(s): {urls} w={width} h={height} fullPage={full_page}")
resp = requests.post(SCREENSHOT_API, json=payload, timeout=timeout)
log(step, f"API status code: {resp.status_code}")
resp.raise_for_status()
data = resp.json()
results = data.get("results", [])
errors = data.get("errors", [])
log(step, f"API results count: {len(results)}, errors count: {len(errors)}")
images = []
for item in results:
out = item.get("output", {})
img_url = out.get("imageUrl")
src = item.get("url")
log(step, f"Parsed result for {src}: imageUrl={'OK' if img_url else 'MISSING'}")
if img_url:
images.append({
"srcUrl": src,
"imageUrl": img_url
})
success = len(images) > 0
log(step, f"Collected {len(images)} image(s). Success={success}")
if errors:
log(step, f"Errors: {errors}")
return {"success": success, "images": images, "errors": errors}
except Exception as e:
log(step, f"Exception: {e}")
return {"success": False, "images": [], "errors": [str(e)]}
# =========================
# Chat history builder
# =========================
def build_formatted_chat_history(
old_ai_response=None,
news_summary="",
twitter_summary="",
gold_price_data=None,
chart_images=None
):
step = "CHAT_BUILD"
log(step, "Start building chat history")
chat_history = []
# 1) role=system
log(step, "Append system role prompt")
chat_history.append({"role": "system", "content": """(أنت نظام ذكاء اصطناعي احترافي متخصص في بناء استراتيجية تداول ذكية ومباشرة على الذهب XAUUSD.
هدفك:
- إنتاج استراتيجية تداول احترافية قابلة للتنفيذ، معتمدة على مزيج من التحليل الإخباري والقراءات الفنية المتقدمة وإدارة المخاطر الديناميكية، ومهيكلة بتنسيق XML المحدد.
- اختيار زوج واحد فقط لعلاقة الارتباط لاكتشاف الفخاخ، دون شرح داخل <Relation>.
مسؤولياتك الأساسية:
1) الأخبار والسياق والسعر:
- جمع آخر النقاط الجوهرية من الأخبار المؤثرة على الذهب (الفيدرالي، التضخم، PMI، التوترات الجيوسياسية، النمو، الدولار، العوائد، سيولة السوق).
- الاستدلال على دفة المخاطر Risk-on/Risk-off.
- تحديد الاتجاه المرجّح قصير/متوسط المدى إن أمكن.
- ملاحظة السعر الحالي أو نطاق سعري حديث إن توفّر. إذا لم يتوفر سعر لحظي، استخدم نطاقاً تقديرياً مع توضيح عدم اليقين.
- إضافة ملخص تويتر مختصر مع ذكر الحسابات ذات التأثير على الذهب/الفوركس.
2) بناء الاستراتيجية:
- استراتيجية تداول احترافية متعددة السيناريوهات، تتضمن:
• شروط دخول/خروج محددة قابلة للاختبار (مستويات، مؤشرات، سرعات شموع، أحجام، وقت).
• شروط متقدمة مثل: Velocity Candle Detection، Signal Score (درجة إشارة رقمية)، فلترة الأخبار القوية (توقف/خفض حجم قبل وبعد الأخبار).
• إدارة صفقات ديناميكية Dynamic Trade Management: وقف خسارة متحرك، جني أرباح جزئي، إعادة توازن الحجم، إلغاء أو تعديل الأوامر المعلّقة.
• قواعد التعامل مع التقلبات: توسيع/تقليص الوقف، تجميد الدخول أثناء سبريد مرتفع أو عزوف سيولة.
• جداول احترافية للسيناريوهات والفخاخ المحتملة مرتبطة بالزوج المختار في <Relation>.
3) علاقة الارتباط والفخاخ:
- اختر زوجاً واحداً فقط له ارتباط مهم مع الذهب للمقارنة والكشف عن الفخاخ (مثل قوة/ضعف الدولار أو العوائد).
- استخدم هذا الزوج في الجداول لتمييز السيناريوهات المتضادة (Divergence/Confluence).
- في وسم <Relation> اكتب اسم الزوج فقط دون شرح.
4) جودة المخرجات وتنسيق XML:
- استخدم العربية الفصيحة الموجزة والعملية.
- لا تضف نصاً خارج عناصر XML المطلوبة.
- التزم بالبنية التالية حصراً:
<news>…</news>
<twiter>…</twiter>
<Strategy>…</Strategy>
<ُExpired>..<ُ/Expired>
<BestOpportunities>
<Buy>…</Buy>
<Sell>…</Sell>
</BestOpportunities>
<Relation>زوج-واحد-فقط</Relation>
تفاصيل العناصر:
• <news>
- أساسية: توجه الفيدرالي، بيانات تضخم/وظائف حديثة، تحركات عوائد وسندات، شهية المخاطرة، أحداث جيوسياسية، وضع الدولار.
- اختتم باستنتاج اتجاهي مرجّح وتأثيره المتوقع على XAUUSD.
- إذا كانت البيانات غير محدثة، اذكر ذلك بوضوح وقدّم إطار احتمالات بدل الجزم.
• <twiter>
- قدم ملخصاً مختصراً م لأبرز ما يتداوله على تويتر.
- اذكر حسابات قامت بنشر اشياء تهمك مع ذكر المنشور @username
- ركّز على المزاج العام، التحذيرات من التقلب، أي إشارات توافق/اختلاف مع السرد الإخباري.
• <Strategy>
- ضع هنا استراتيجيه قويه تقلل خسائر و تصنع ربح حوالي 100 نقطه بناء عل الاخبار و التوجه العام لارسالها الي بوت صانع صفقات
• <ُExpired>
- ضع هنا تاريخ ووقت و سعر تعتقد ان استرااتيجيتك تنتهي اذا وصلت
• <BestOpportunities>
- قدّم وصفاً عملياً قصيراً لأفضل حالات الشراء وأفضل حالات البيع وفق شروط مثالية متكاملة (مستوى، زخم، Score، فلترة أخبار، إدارة).
• <Relation>
- اسم زوج واحد فقط، مثل: DXY أو USDJPY أو US10Y أو EURUSD. اكتب الرمز/الاسم فقط بلا أي شرح.
معايير الدقة والشفافية:
- لا تخترع أسعاراً مؤكدة إن لم تكن متاحة؛ استخدم نطاقات تقريبية مع توضيح عدم اليقين.
- اذكر الافتراضات بوضوح.
- احرص أن تكون القواعد قابلة للتنفيذ وليست عامة أو إنشائية.
- لا تكرر المحتوى بلا داع.
- أعد المخرجات دائماً بصيغة XML بالترتيب المحدد أعلاه دون إضافات.)"""})
# 2) role=user -> chart images as multipart entries
if chart_images:
log(step, f"Appending {len(chart_images)} chart image(s) to chat")
for img in chart_images:
timeframe = img.get("timeframe", "").strip()
image_url = img.get("imageUrl")
if image_url:
chat_history.append({
"role": "user",
"type": "multipart",
"content": [
{"type": "image", "url": image_url},
{"type": "text", "text": f"هذه لقطة شاشة لزوج XAUUSD إطار زمني {timeframe}"}
]
})
log(step, f"Added image message for timeframe={timeframe}, url={image_url}")
else:
log(step, f"Skipped image missing URL for timeframe={timeframe}")
else:
log(step, "No chart images available to append")
# 3) role=user -> News + Twitter + Price
log(step, "Appending news + twitter + price text block")
user_content = "News from Tridingview News :"
if news_summary:
user_content += f"{news_summary}\n\n"
else:
user_content += "No fresh news summary available.\n\n"
if twitter_summary:
user_content += "Twitter Feed Summary:\n"
user_content += f"{twitter_summary}\n\n"
else:
user_content += "Twitter Feed Summary: No recent Twitter data available.\n\n"
if gold_price_data is not None:
user_content += (
f"Gold Price [XAUUSD]:\n"
f" Bid: {gold_price_data.get('bid')}\n"
f" Ask: {gold_price_data.get('ask')}\n"
f" Spread: {gold_price_data.get('difference')}\n"
)
chat_history.append({"role": "user", "content": user_content})
log(step, "Appended news/price block")
# 4) role=user -> OLD ANALYSIS if exists
if old_ai_response:
chat_history.append({"role": "user", "content": "OLD ANALYSIS FROM YOU:" + old_ai_response})
log(step, "Appended old AI analysis")
else:
log(step, "No old AI analysis available")
# 5) role=user -> update question
update_prompt = "هل تريد تحديث تحليلك واستراتيجيتك بناءً على هذه المعطيات؟"
chat_history.append({"role": "user", "content": update_prompt})
log(step, "Appended update question")
log(step, "Chat history build complete")
return chat_history
def update_strategy_job():
"""Function to be scheduled, runs the main logic"""
global last_update_result
step = "JOB"
log(step, "Scheduled update started")
try:
# 1) Fetch old data from news.json in GitHub
log(step, "Fetching old data from GitHub (news.json)")
old_data_response = fetch_news_json_from_github()
old_ai_response = None
if old_data_response.get("success") and old_data_response.get("data"):
log(step, "Old data fetch success")
old_data = old_data_response["data"]
old_ai_response = json.dumps(old_data, ensure_ascii=False, separators=(',', ':'))
else:
log(step, f"Old data fetch failed or empty: {old_data_response}")
old_ai_response = None
# 2) Fetch news + gold price
log(step, "Fetching recent news items")
news_items = get_recent_news_items(hours=24)
log(step, f"Fetched {len(news_items) if news_items else 0} news item(s)")
news_summary = build_news_summary(news_items)
log(step, "Built news summary")
log(step, "Fetching live gold price XAUUSD")
gold_price_data = get_live_rates_for_pair("XAUUSD")
log(step, f"Price data: {gold_price_data}")
# 2b) Fetch Twitter summary from twiter.json
log(step, "Fetching twitter summary from GitHub (twiter.json)")
twitter_summary = ""
try:
tw_resp = fetch_twitter_json_from_github()
if tw_resp.get("success") and tw_resp.get("data"):
data = tw_resp["data"]
if isinstance(data, dict) and "twiter" in data:
twitter_summary = data.get("twiter", "")
elif isinstance(data, str):
twitter_summary = data
else:
twitter_summary = json.dumps(data, ensure_ascii=False)
log(step, "Twitter summary fetched successfully")
else:
log(step, f"Twitter fetch returned no data: {tw_resp}")
twitter_summary = ""
except Exception as te:
log(step, f"Twitter fetch error: {te}")
twitter_summary = ""
# 2c) Capture chart screenshots for 15m and 1h
charts = [
{
"url": "https://corvo-ai-charts.static.hf.space/index.html?symbol=XAUUSD&interval=15&exchange=OANDA",
"timeframe": "15 دقيقة"
},
{
"url": "https://corvo-ai-charts.static.hf.space/index.html?symbol=XAUUSD&interval=60&exchange=OANDA",
"timeframe": "1 ساعة"
}
]
chart_images = []
try:
log(step, "Capturing chart screenshots (15m, 1h)")
capture_result = capture_screenshots([c["url"] for c in charts], width=1920, height=1080, full_page=False)
if capture_result.get("success"):
log(step, "Screenshot capture success")
src_to_tf = {c["url"]: c["timeframe"] for c in charts}
for img in capture_result.get("images", []):
chart_images.append({
"timeframe": src_to_tf.get(img.get("srcUrl"), ""),
"imageUrl": img.get("imageUrl")
})
log(step, f"Prepared {len(chart_images)} chart image entries for chat")
else:
errs = capture_result.get("errors")
log(step, f"Screenshot capture failed. Errors: {errs}")
except Exception as e:
log(step, f"Screenshot exception: {e}")
# 3) Build chat_history including images
log(step, "Building chat history")
chat_history = build_formatted_chat_history(
old_ai_response=old_ai_response,
news_summary=news_summary,
twitter_summary=twitter_summary,
gold_price_data=gold_price_data,
chart_images=chart_images
)
log(step, f"Chat history ready. Messages count: {len(chat_history)}")
# 4) Call AI
log(step, "Calling AI API with chat history")
ai_response, updated_chat_history = call_o1_ai_api(chat_history)
log(step, "AI API call completed")
# 5) Save AI response to GitHub in news.json
log(step, "Serializing AI response for GitHub")
result_json_object = {
"response": ai_response,
"timestamp": time.time()
}
new_content_one_line = json.dumps([result_json_object], ensure_ascii=False, separators=(',', ':'))
log(step, f"Serialized content length: {len(new_content_one_line)}")
# 6) Get authenticity_token and commit_oid
log(step, "Fetching authenticity token and commit oid for news.json")
token, commit_oid = fetch_news_auth_token_and_oid()
if not token or not commit_oid:
last_update_result = {
"success": False,
"timestamp": time.time(),
"message": "Could not fetch authenticity token or commit oid for news.json."
}
log(step, last_update_result['message'])
return
log(step, "Fetched token and commit oid successfully")
# 7) Update GitHub file
log(step, "Updating news.json on GitHub")
update_result = update_news_json_file(token, commit_oid, new_content_one_line)
if update_result.get("success"):
last_update_result = {
"success": True,
"timestamp": time.time(),
"message": "Updated news.json successfully!",
"response": ai_response
}
log(step, last_update_result['message'])
else:
last_update_result = {
"success": False,
"timestamp": time.time(),
"message": f"Failed to update news.json: {update_result.get('message')}"
}
log(step, last_update_result['message'])
except Exception as e:
last_update_result = {
"success": False,
"timestamp": time.time(),
"message": f"Error during update: {str(e)}"
}
log(step, last_update_result['message'])
# API routes
@app.route('/status', methods=['GET'])
def get_status():
"""Returns the status of the last update"""
log("API", "GET /status called")
return jsonify({
"success": last_update_result.get("success", False),
"timestamp": last_update_result.get("timestamp"),
"message": last_update_result.get("message"),
"last_updated": time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(last_update_result.get("timestamp")))
if last_update_result.get("timestamp") else None
})
@app.route('/force-update', methods=['GET'])
def force_update():
"""Force an immediate update"""
log("API", "GET /force-update called")
update_strategy_job()
return jsonify({
"success": last_update_result.get("success", False),
"message": "Update job triggered",
"result": last_update_result
})
@app.route('/latest', methods=['GET'])
def get_latest():
"""Get the latest strategy response"""
log("API", "GET /latest called")
if "response" in last_update_result:
return jsonify({
"success": True,
"timestamp": last_update_result.get("timestamp"),
"response": last_update_result.get("response"),
"last_updated": time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(last_update_result.get("timestamp")))
})
else:
return jsonify({
"success": False,
"message": "No strategy data available yet"
})
# Setup the scheduler
def init_scheduler():
"""Initialize and start the scheduler"""
scheduler = BackgroundScheduler()
scheduler.add_job(func=update_strategy_job, trigger="interval", hours=1)
scheduler.start()
log("SCHED", "Scheduler started: will run every hour")
# Run once immediately on startup
log("SCHED", "Running initial update job on startup")
update_strategy_job()
if __name__ == "__main__":
# Initialize the scheduler before starting the Flask app
init_scheduler()
# Start the Flask app
log("APP", "Starting Flask server on port 7860")
app.run(host='0.0.0.0', port=7860)