Spaces:
Paused
Paused
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import os | |
import sys | |
import time | |
import signal | |
import logging | |
import threading | |
import subprocess | |
import central_manager | |
from flask import Flask, jsonify | |
# إعدادات عامة | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
) | |
HERE = os.path.dirname(os.path.abspath(__file__)) | |
PYTHON = sys.executable # نفس مفسّر بايثون الحالي | |
LISTENER_PATH = os.path.join(HERE, "telegram_listener.py") | |
CHECK_INTERVAL = 5 # ثواني بين فحوصات الحارس | |
app = Flask(__name__) | |
_listener_proc = None | |
_stop_flag = False | |
def _spawn_listener(): | |
"""تشغيل telegram_listener.py كعملية خلفية.""" | |
if not os.path.isfile(LISTENER_PATH): | |
logging.error("لم يتم العثور على telegram_listener.py في: %s", LISTENER_PATH) | |
return None | |
try: | |
proc = subprocess.Popen( | |
[PYTHON, LISTENER_PATH], | |
cwd=HERE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
text=True, | |
bufsize=1, | |
) | |
logging.info("تم تشغيل telegram_listener.py (pid=%s)", proc.pid) | |
return proc | |
except Exception as e: | |
logging.exception("تعذّر تشغيل telegram_listener.py: %s", e) | |
return None | |
def _watchdog(): | |
"""حارس لإبقاء المستمع شغّالاً دائماً؛ يعيد تشغيله عند التوقف.""" | |
global _listener_proc, _stop_flag | |
while not _stop_flag: | |
if _listener_proc is None or _listener_proc.poll() is not None: | |
# إذا كان غير موجود أو متوقف — شغّله | |
_listener_proc = _spawn_listener() | |
time.sleep(CHECK_INTERVAL) | |
def _start_watchdog_once(): | |
"""تشغيل خيط الحارس مرّة واحدة.""" | |
if not getattr(_start_watchdog_once, "started", False): | |
t = threading.Thread(target=_watchdog, daemon=True) | |
t.start() | |
_start_watchdog_once.started = True | |
logging.info("تم تشغيل خيط الحارس.") | |
def index(): | |
# تأكد أن الحارس يعمل (مفيد إذا أعاد السيرفر التحميل) | |
_start_watchdog_once() | |
return "🚀 App is running… Telegram listener watchdog is active." | |
def health(): | |
alive = (_listener_proc is not None) and (_listener_proc.poll() is None) | |
return jsonify( | |
status="ok", | |
listener_running=alive, | |
pid=_listener_proc.pid if alive else None, | |
) | |
def start_telegram(): | |
global _listener_proc | |
if _listener_proc is None or _listener_proc.poll() is not None: | |
_listener_proc = _spawn_listener() | |
if _listener_proc is None: | |
return "⚠️ فشل تشغيل telegram_listener.py (تحقق من السجلات).", 500 | |
return "✅ تم تشغيل telegram_listener.py.", 200 | |
return "ℹ️ المستمع يعمل بالفعل.", 200 | |
def stop_telegram(): | |
global _listener_proc | |
if _listener_proc and _listener_proc.poll() is None: | |
try: | |
_listener_proc.terminate() | |
_listener_proc.wait(timeout=10) | |
logging.info("تم إيقاف telegram_listener.py (pid=%s).", _listener_proc.pid) | |
except Exception: | |
try: | |
_listener_proc.kill() | |
except Exception: | |
pass | |
_listener_proc = None | |
return "🛑 تم إيقاف telegram_listener.py.", 200 | |
return "ℹ️ لا توجد عملية مستمع تعمل.", 200 | |
def _graceful_shutdown(*_args): | |
"""إيقاف أنيق عند إنهاء الحاوية.""" | |
global _stop_flag, _listener_proc | |
_stop_flag = True | |
if _listener_proc and _listener_proc.poll() is None: | |
try: | |
_listener_proc.terminate() | |
_listener_proc.wait(timeout=10) | |
except Exception: | |
try: | |
_listener_proc.kill() | |
except Exception: | |
pass | |
# ربط إشارات الإنهاء (مهم لـ Spaces) | |
signal.signal(signal.SIGTERM, _graceful_shutdown) | |
signal.signal(signal.SIGINT, _graceful_shutdown) | |
if __name__ == "__main__": | |
# شغّل الحارس تلقائياً عند تشغيل السيرفر | |
_start_watchdog_once() | |
port = int(os.environ.get("PORT", "7860")) | |
app.run(host="0.0.0.0", port=port) | |
#!/usr/bin/env python3 | |
# central_manager.py | |
import time | |
import threading | |
from typing import Dict, List | |
import requests | |
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
from peer_discovery import PORT | |
# ---- إعداد FastAPI ---------------------------------------------------------- | |
app = FastAPI(title="Central Task Manager") | |
# ---- نماذج البيانات -------------------------------------------------------- | |
class RegisterRequest(BaseModel): | |
"""تسجيل أو تجديد ظهور العقدة.""" | |
url: str # مثلاً: "http://203.0.113.45:PORT/run" | |
load: float = 0.0 # نسبة تحميل العقدة (0.0 - 1.0)، اختياري | |
class TaskRequest(BaseModel): | |
func: str | |
args: List = [] | |
kwargs: Dict = {} | |
complexity: float = 0.0 | |
# ---- سجلّ العقد ------------------------------------------------------------ | |
# بنخزّن للعقدة: آخر timestamp و load | |
peers: Dict[str, Dict] = {} | |
HEARTBEAT_TTL = 60 # ثواني قبل اعتبار العقدة متوقفة | |
HEALTH_CHECK_FREQ = 30 # ثواني بين فحوص الصحة الداخلية | |
# ---- API للعقد لتسجيل نفسها ----------------------------------------------- | |
async def register_peer(req: RegisterRequest): | |
"""العقدة تستدعي هذه النقطة كلما انطلقت أو دورياً لتجديد ظهورها.""" | |
peers[req.url] = {"last_seen": time.time(), "load": req.load} | |
return {"status": "ok", "peers_count": len(peers)} | |
# ---- API للعمليات --------------------------------------------------------- | |
async def list_peers(): | |
"""يعيد قائمة بالعقد الصالحة بعد تنقية المتوقفة.""" | |
now = time.time() | |
# حذف العقد المتوقفة | |
for url, info in list(peers.items()): | |
if now - info["last_seen"] > HEARTBEAT_TTL: | |
peers.pop(url) | |
return list(peers.keys()) | |
async def dispatch_task(task: TaskRequest): | |
"""يتلقى مهمة ويعيد توجيهها لأفضل عقدة أو ينفذ محليّاً.""" | |
available = await list_peers() | |
if not available: | |
raise HTTPException(503, "لا توجد عقد متاحة حاليّاً") | |
# خوارزمية بسيطة: الاختيار بناءً على أقل تحميل معلن | |
# أو تدوير دائري إذا لم يعلن أحد عن تحميله | |
best = None | |
best_load = 1.1 | |
for url in available: | |
load = peers[url].get("load", None) | |
if load is None: | |
best = url | |
break | |
if load < best_load: | |
best, best_load = url, load | |
if not best: | |
best = available[0] | |
# إعادة توجيه الطلب | |
try: | |
resp = requests.post(best, json=task.dict(), timeout=10) | |
resp.raise_for_status() | |
return resp.json() | |
except Exception as e: | |
raise HTTPException(502, f"فشل التوجيه إلى {best}: {e}") | |
# ---- فحص دوري لصحة العقد --------------------------------------------------- | |
def health_check_loop(): | |
while True: | |
now = time.time() | |
for url in list(peers.keys()): | |
health_url = url.replace("/run", "/health") | |
try: | |
r = requests.get(health_url, timeout=3) | |
if r.status_code == 200: | |
peers[url]["last_seen"] = now | |
# يمكنك تحديث load من رد /health إذا وفّرته | |
else: | |
peers.pop(url) | |
except: | |
peers.pop(url) | |
time.sleep(HEALTH_CHECK_FREQ) | |
# ---- تشغيل الخلفيات وخادم FastAPI ------------------------------------------ | |
if __name__ == "__main__": | |
# شغل لوب الفحص الطبي في الخلفية | |
threading.Thread(target=health_check_loop, daemon=True).start() | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=1500) | |
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
main.py — نظام توزيع المهام الذكي | |
""" | |
import os | |
import sys | |
import time | |
import threading | |
import subprocess | |
import logging | |
import argparse | |
import socket | |
import random | |
import requests | |
import importlib.util | |
from pathlib import Path | |
from typing import Any | |
from flask import Flask, request, jsonify | |
from flask_cors import CORS | |
from peer_discovery import CENTRAL_REGISTRY_SERVERS | |
# ─────────────── إعدادات المسارات ─────────────── | |
FILE = Path(__file__).resolve() | |
BASE_DIR = FILE.parent | |
sys.path.insert(0, str(BASE_DIR)) | |
# ─────────────── إعداد السجلات ─────────────── | |
os.makedirs("logs", exist_ok=True) | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
handlers=[ | |
logging.StreamHandler(sys.stdout), | |
logging.FileHandler("logs/main.log", mode="a", encoding="utf-8") | |
] | |
) | |
# ─────────────── تحميل متغيرات البيئة ─────────────── | |
try: | |
from dotenv import load_dotenv | |
load_dotenv() | |
logging.info("تم تحميل متغيرات البيئة من .env") | |
except ImportError: | |
logging.warning("python-dotenv غير مثبَّت؛ تَخطّي .env") | |
# ─────────────── ثوابت التهيئة ─────────────── | |
CPU_PORT = int(os.getenv("CPU_PORT", "5297")) | |
SHARED_SECRET = os.getenv("SHARED_SECRET", "my_shared_secret_123") | |
PYTHON_EXE = sys.executable | |
# ─────────────── خيارات سطر الأوامر ─────────────── | |
parser = argparse.ArgumentParser(description="نظام توزيع المهام الذكي") | |
parser.add_argument( | |
"--stats-interval", "-s", | |
type=int, | |
default=0, | |
help="ثواني بين كل طباعة لإحصائية الأقران (0 = مرة واحدة فقط)" | |
) | |
parser.add_argument( | |
"--no-cli", | |
action="store_true", | |
help="تعطيل القائمة التفاعلية حتى عند وجود TTY" | |
) | |
args = parser.parse_args() | |
# ─────────────── متغيرات النظام ─────────────── | |
PEERS = set() # مجموعة عناوين الأقران كسلاسل نصية | |
PEERS_INFO = {} # قاموس لحفظ معلومات الأقران الكاملة | |
current_server_index = 0 | |
# ─────────────── دوال اكتشاف الأقران ─────────────── | |
def register_service_lan(): | |
"""تسجيل الخدمة على الشبكة المحلية""" | |
while True: | |
try: | |
logging.info("جارٍ تسجيل الخدمة على الشبكة المحلية...") | |
time.sleep(10) | |
except Exception as e: | |
logging.error(f"خطأ في تسجيل الخدمة: {e}") | |
def discover_lan_loop(): | |
"""اكتشاف الأقران على الشبكة المحلية""" | |
while True: | |
try: | |
logging.info("جارٍ مسح الشبكة المحلية...") | |
time.sleep(15) | |
except Exception as e: | |
logging.error(f"خطأ في اكتشاف الأقران: {e}") | |
def fetch_central_loop(): | |
"""جلب تحديثات من السيرفر المركزي""" | |
while True: | |
try: | |
logging.info("جارٍ تحديث قائمة الأقران...") | |
time.sleep(30) | |
except Exception as e: | |
logging.error(f"خطأ في جلب التحديثات: {e}") | |
# ─────────────── دوال مساعدة ─────────────── | |
def get_local_ip(): | |
"""الحصول على عنوان IP المحلي""" | |
try: | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
s.connect(("8.8.8.8", 80)) | |
ip = s.getsockname()[0] | |
s.close() | |
return ip | |
except Exception: | |
return "127.0.0.1" | |
def add_peer(peer_data): | |
"""إضافة قرين جديد إلى النظام""" | |
peer_url = f"http://{peer_data['ip']}:{peer_data['port']}/run" | |
if peer_url not in PEERS: | |
PEERS.add(peer_url) | |
PEERS_INFO[peer_url] = peer_data | |
logging.info(f"تمت إضافة قرين جديد: {peer_url}") | |
return peer_url | |
def benchmark(fn, *args): | |
"""قياس زمن تنفيذ الدالة""" | |
t0 = time.time() | |
res = fn(*args) | |
return time.time() - t0, res | |
def load_and_run_peer_discovery(): | |
"""تحميل وتشغيل ملف peer_discovery.py""" | |
try: | |
peer_discovery_path = Path(__file__).parent / "peer_discovery.py" | |
if not peer_discovery_path.exists(): | |
raise FileNotFoundError("ملف peer_discovery.py غير موجود") | |
spec = importlib.util.spec_from_file_location("peer_discovery_module", peer_discovery_path) | |
peer_module = importlib.util.module_from_spec(spec) | |
spec.loader.exec_module(peer_module) | |
logging.info("تم تحميل peer_discovery.py بنجاح") | |
return peer_module | |
except Exception as e: | |
logging.error(f"خطأ في تحميل peer_discovery.py: {str(e)}") | |
return None | |
# ─────────────── دوال المهام ─────────────── | |
def example_task(x: int) -> int: | |
"""دالة مثال بديلة إذا لم تكن موجودة في your_tasks.py""" | |
return x * x | |
def matrix_multiply(size: int) -> list: | |
"""ضرب المصفوفات (بديل مؤقت)""" | |
return [[i*j for j in range(size)] for i in range(size)] | |
def prime_calculation(limit: int) -> list: | |
"""حساب الأعداد الأولية (بديل مؤقت)""" | |
primes = [] | |
for num in range(2, limit): | |
if all(num % i != 0 for i in range(2, int(num**0.5) + 1)): | |
primes.append(num) | |
return primes | |
def data_processing(size: int) -> dict: | |
"""معالجة البيانات (بديل مؤقت)""" | |
return {i: i**2 for i in range(size)} | |
# ─────────────── خادم Flask ─────────────── | |
flask_app = Flask(__name__) | |
CORS(flask_app, resources={r"/*": {"origins": "*"}}) | |
def run_task(): | |
try: | |
data = request.get_json() if request.is_json else request.form | |
task_id = data.get("task_id") | |
if not task_id: | |
return jsonify(error="يجب تحديد task_id"), 400 | |
if task_id == "1": | |
result = matrix_multiply(500) | |
elif task_id == "2": | |
result = prime_calculation(100_000) | |
elif task_id == "3": | |
result = data_processing(10_000) | |
else: | |
return jsonify(error="معرف المهمة غير صحيح"), 400 | |
return jsonify(result=result) | |
except Exception as e: | |
logging.error(f"خطأ في معالجة المهمة: {str(e)}", exc_info=True) | |
return jsonify(error="حدث خطأ داخلي في الخادم"), 500 | |
def start_flask_server(): | |
ip_public = os.getenv("PUBLIC_IP", "127.0.0.1") | |
logging.info(f"Flask متوفر على: http://{ip_public}:{CPU_PORT}/run_task") | |
flask_app.run(host="0.0.0.0", port=CPU_PORT, debug=False) | |
# ─────────────── دوال النظام الأساسية ─────────────── | |
def connect_until_success(): | |
global CPU_PORT, current_server_index | |
peer_module = load_and_run_peer_discovery() | |
if peer_module is None: | |
logging.warning("سيستمر التشغيل بدون peer_discovery.py") | |
return None, [] | |
CENTRAL_REGISTRY_SERVERS = getattr(peer_module, 'CENTRAL_REGISTRY_SERVERS', []) | |
if not CENTRAL_REGISTRY_SERVERS: | |
logging.error("قائمة السيرفرات المركزية فارغة") | |
return None, [] | |
while True: | |
for port in [CPU_PORT, 5298, 5299]: | |
for idx, server in enumerate(CENTRAL_REGISTRY_SERVERS): | |
info = { | |
"node_id": os.getenv("NODE_ID", socket.gethostname()), | |
"ip": get_local_ip(), | |
"port": port | |
} | |
try: | |
resp = requests.post(f"{server}/register", json=info, timeout=5) | |
resp.raise_for_status() | |
CPU_PORT = port | |
current_server_index = idx | |
logging.info(f"تم الاتصال بالسيرفر: {server} على المنفذ {CPU_PORT}") | |
# معالجة قائمة الأقران المستلمة | |
peers_list = resp.json() | |
peer_urls = [] | |
for p in peers_list: | |
peer_url = add_peer(p) | |
peer_urls.append(peer_url) | |
return server, peer_urls | |
except Exception as e: | |
logging.warning(f"فشل الاتصال بـ {server}: {str(e)}") | |
time.sleep(5) | |
def main(): | |
"""الدالة الرئيسية لتشغيل النظام""" | |
# تشغيل الخدمات الأساسية | |
try: | |
subprocess.Popen([PYTHON_EXE, "peer_server.py", "--port", str(CPU_PORT)]) | |
subprocess.Popen([PYTHON_EXE, "load_balancer.py"]) | |
logging.info("تم تشغيل الخدمات الخلفيّة") | |
except Exception as exc: | |
logging.error(f"خطأ بتشغيل الخدمات الخلفية: {exc}") | |
# الاتصال بالسيرفر المركزي | |
server, initial_peers = connect_until_success() | |
# تشغيل خادم Flask | |
threading.Thread(target=start_flask_server, daemon=True).start() | |
# البقاء في حلقة رئيسية | |
try: | |
while True: | |
time.sleep(1) | |
except KeyboardInterrupt: | |
logging.info("تم إنهاء البرنامج.") | |
# إضافة القرين المحلي | |
add_peer({"ip": "127.0.0.1", "port": CPU_PORT}) | |
# تشغيل خدمات اكتشاف الأقران | |
threading.Thread(target=register_service_lan, daemon=True).start() | |
threading.Thread(target=discover_lan_loop, daemon=True).start() | |
threading.Thread(target=fetch_central_loop, daemon=True).start() | |
# بدء النظام الرئيسي | |
main() | |
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
main.py — نظام توزيع المهام الذكي | |
""" | |
import os | |
import sys | |
import time | |
import threading | |
import subprocess | |
import logging | |
import argparse | |
import socket | |
import random | |
import requests | |
import importlib.util | |
from pathlib import Path | |
from typing import Any | |
from flask import Flask, request, jsonify | |
from flask_cors import CORS | |
from peer_discovery import CENTRAL_REGISTRY_SERVERS | |
# ─────────────── إعدادات المسارات ─────────────── | |
FILE = Path(__file__).resolve() | |
BASE_DIR = FILE.parent | |
sys.path.insert(0, str(BASE_DIR)) | |
# ─────────────── إعداد السجلات ─────────────── | |
os.makedirs("logs", exist_ok=True) | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
handlers=[ | |
logging.StreamHandler(sys.stdout), | |
logging.FileHandler("logs/main.log", mode="a", encoding="utf-8") | |
] | |
) | |
# ─────────────── تحميل متغيرات البيئة ─────────────── | |
try: | |
from dotenv import load_dotenv | |
load_dotenv() | |
logging.info("تم تحميل متغيرات البيئة من .env") | |
except ImportError: | |
logging.warning("python-dotenv غير مثبَّت؛ تَخطّي .env") | |
# ─────────────── ثوابت التهيئة ─────────────── | |
CPU_PORT = int(os.getenv("CPU_PORT", "5297")) | |
SHARED_SECRET = os.getenv("SHARED_SECRET", "my_shared_secret_123") | |
PYTHON_EXE = sys.executable | |
# ─────────────── خيارات سطر الأوامر ─────────────── | |
parser = argparse.ArgumentParser(description="نظام توزيع المهام الذكي") | |
parser.add_argument( | |
"--stats-interval", "-s", | |
type=int, | |
default=0, | |
help="ثواني بين كل طباعة لإحصائية الأقران (0 = مرة واحدة فقط)" | |
) | |
parser.add_argument( | |
"--no-cli", | |
action="store_true", | |
help="تعطيل القائمة التفاعلية حتى عند وجود TTY" | |
) | |
args = parser.parse_args() | |
# ─────────────── متغيرات النظام ─────────────── | |
PEERS = set() # مجموعة عناوين الأقران كسلاسل نصية | |
PEERS_INFO = {} # قاموس لحفظ معلومات الأقران الكاملة | |
current_server_index = 0 | |
# ─────────────── دوال اكتشاف الأقران ─────────────── | |
def register_service_lan(): | |
"""تسجيل الخدمة على الشبكة المحلية""" | |
while True: | |
try: | |
logging.info("جارٍ تسجيل الخدمة على الشبكة المحلية...") | |
time.sleep(10) | |
except Exception as e: | |
logging.error(f"خطأ في تسجيل الخدمة: {e}") | |
def discover_lan_loop(): | |
"""اكتشاف الأقران على الشبكة المحلية""" | |
while True: | |
try: | |
logging.info("جارٍ مسح الشبكة المحلية...") | |
time.sleep(15) | |
except Exception as e: | |
logging.error(f"خطأ في اكتشاف الأقران: {e}") | |
def fetch_central_loop(): | |
"""جلب تحديثات من السيرفر المركزي""" | |
while True: | |
try: | |
logging.info("جارٍ تحديث قائمة الأقران...") | |
time.sleep(30) | |
except Exception as e: | |
logging.error(f"خطأ في جلب التحديثات: {e}") | |
# ─────────────── دوال مساعدة ─────────────── | |
def get_local_ip(): | |
"""الحصول على عنوان IP المحلي""" | |
try: | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
s.connect(("8.8.8.8", 80)) | |
ip = s.getsockname()[0] | |
s.close() | |
return ip | |
except Exception: | |
return "127.0.0.1" | |
def add_peer(peer_data): | |
"""إضافة قرين جديد إلى النظام""" | |
peer_url = f"http://{peer_data['ip']}:{peer_data['port']}/run" | |
if peer_url not in PEERS: | |
PEERS.add(peer_url) | |
PEERS_INFO[peer_url] = peer_data | |
logging.info(f"تمت إضافة قرين جديد: {peer_url}") | |
return peer_url | |
def benchmark(fn, *args): | |
"""قياس زمن تنفيذ الدالة""" | |
t0 = time.time() | |
res = fn(*args) | |
return time.time() - t0, res | |
def load_and_run_peer_discovery(): | |
"""تحميل وتشغيل ملف peer_discovery.py""" | |
try: | |
peer_discovery_path = Path(__file__).parent / "peer_discovery.py" | |
if not peer_discovery_path.exists(): | |
raise FileNotFoundError("ملف peer_discovery.py غير موجود") | |
spec = importlib.util.spec_from_file_location("peer_discovery_module", peer_discovery_path) | |
peer_module = importlib.util.module_from_spec(spec) | |
spec.loader.exec_module(peer_module) | |
logging.info("تم تحميل peer_discovery.py بنجاح") | |
return peer_module | |
except Exception as e: | |
logging.error(f"خطأ في تحميل peer_discovery.py: {str(e)}") | |
return None | |
# ─────────────── دوال المهام ─────────────── | |
def example_task(x: int) -> int: | |
"""دالة مثال بديلة إذا لم تكن موجودة في your_tasks.py""" | |
return x * x | |
def matrix_multiply(size: int) -> list: | |
"""ضرب المصفوفات (بديل مؤقت)""" | |
return [[i*j for j in range(size)] for i in range(size)] | |
def prime_calculation(limit: int) -> list: | |
"""حساب الأعداد الأولية (بديل مؤقت)""" | |
primes = [] | |
for num in range(2, limit): | |
if all(num % i != 0 for i in range(2, int(num**0.5) + 1)): | |
primes.append(num) | |
return primes | |
def data_processing(size: int) -> dict: | |
"""معالجة البيانات (بديل مؤقت)""" | |
return {i: i**2 for i in range(size)} | |
# ─────────────── خادم Flask ─────────────── | |
flask_app = Flask(__name__) | |
CORS(flask_app, resources={r"/*": {"origins": "*"}}) | |
def run_task(): | |
try: | |
data = request.get_json() if request.is_json else request.form | |
task_id = data.get("task_id") | |
if not task_id: | |
return jsonify(error="يجب تحديد task_id"), 400 | |
if task_id == "1": | |
result = matrix_multiply(500) | |
elif task_id == "2": | |
result = prime_calculation(100_000) | |
elif task_id == "3": | |
result = data_processing(10_000) | |
else: | |
return jsonify(error="معرف المهمة غير صحيح"), 400 | |
return jsonify(result=result) | |
except Exception as e: | |
logging.error(f"خطأ في معالجة المهمة: {str(e)}", exc_info=True) | |
return jsonify(error="حدث خطأ داخلي في الخادم"), 500 | |
def start_flask_server(): | |
ip_public = os.getenv("PUBLIC_IP", "127.0.0.1") | |
logging.info(f"Flask متوفر على: http://{ip_public}:{CPU_PORT}/run_task") | |
flask_app.run(host="0.0.0.0", port=CPU_PORT, debug=False) | |
# ─────────────── دوال النظام الأساسية ─────────────── | |
def connect_until_success(): | |
global CPU_PORT, current_server_index | |
peer_module = load_and_run_peer_discovery() | |
if peer_module is None: | |
logging.warning("سيستمر التشغيل بدون peer_discovery.py") | |
return None, [] | |
CENTRAL_REGISTRY_SERVERS = getattr(peer_module, 'CENTRAL_REGISTRY_SERVERS', []) | |
if not CENTRAL_REGISTRY_SERVERS: | |
logging.error("قائمة السيرفرات المركزية فارغة") | |
return None, [] | |
while True: | |
for port in [CPU_PORT, 5298, 5299]: | |
for idx, server in enumerate(CENTRAL_REGISTRY_SERVERS): | |
info = { | |
"node_id": os.getenv("NODE_ID", socket.gethostname()), | |
"ip": get_local_ip(), | |
"port": port | |
} | |
try: | |
resp = requests.post(f"{server}/register", json=info, timeout=5) | |
resp.raise_for_status() | |
CPU_PORT = port | |
current_server_index = idx | |
logging.info(f"تم الاتصال بالسيرفر: {server} على المنفذ {CPU_PORT}") | |
# معالجة قائمة الأقران المستلمة | |
peers_list = resp.json() | |
peer_urls = [] | |
for p in peers_list: | |
peer_url = add_peer(p) | |
peer_urls.append(peer_url) | |
return server, peer_urls | |
except Exception as e: | |
logging.warning(f"فشل الاتصال بـ {server}: {str(e)}") | |
time.sleep(5) | |
def main(): | |
"""الدالة الرئيسية لتشغيل النظام""" | |
# تشغيل الخدمات الأساسية | |
try: | |
subprocess.Popen([PYTHON_EXE, "peer_server.py", "--port", str(CPU_PORT)]) | |
subprocess.Popen([PYTHON_EXE, "load_balancer.py"]) | |
logging.info("تم تشغيل الخدمات الخلفيّة") | |
except Exception as exc: | |
logging.error(f"خطأ بتشغيل الخدمات الخلفية: {exc}") | |
# الاتصال بالسيرفر المركزي | |
server, initial_peers = connect_until_success() | |
# تشغيل خادم Flask | |
threading.Thread(target=start_flask_server, daemon=True).start() | |
# البقاء في حلقة رئيسية | |
try: | |
while True: | |
time.sleep(1) | |
except KeyboardInterrupt: | |
logging.info("تم إنهاء البرنامج.") | |
if __name__ == "__main__": | |
# إضافة القرين المحلي | |
add_peer({"ip": "127.0.0.1", "port": CPU_PORT}) | |
# تشغيل خدمات اكتشاف الأقران | |
threading.Thread(target=register_service_lan, daemon=True).start() | |
threading.Thread(target=discover_lan_loop, daemon=True).start() | |
threading.Thread(target=fetch_central_loop, daemon=True).start() | |
# بدء النظام الرئيسي | |
main() | |
if __name__ == "__main__": | |