Spaces:
Running
Running
#!/usr/bin/env python3 | |
# central_manager.py | |
import time | |
import threading | |
from typing import Dict, List | |
import requests | |
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
from peer_discovery import PORT | |
# ---- إعداد FastAPI ---------------------------------------------------------- | |
app = FastAPI(title="Central Task Manager") | |
# ---- نماذج البيانات -------------------------------------------------------- | |
class RegisterRequest(BaseModel): | |
"""تسجيل أو تجديد ظهور العقدة.""" | |
url: str # مثلاً: "http://203.0.113.45:PORT/run" | |
load: float = 0.0 # نسبة تحميل العقدة (0.0 - 1.0)، اختياري | |
class TaskRequest(BaseModel): | |
func: str | |
args: List = [] | |
kwargs: Dict = {} | |
complexity: float = 0.0 | |
# ---- سجلّ العقد ------------------------------------------------------------ | |
# بنخزّن للعقدة: آخر timestamp و load | |
peers: Dict[str, Dict] = {} | |
HEARTBEAT_TTL = 60 # ثواني قبل اعتبار العقدة متوقفة | |
HEALTH_CHECK_FREQ = 30 # ثواني بين فحوص الصحة الداخلية | |
# ---- API للعقد لتسجيل نفسها ----------------------------------------------- | |
async def register_peer(req: RegisterRequest): | |
"""العقدة تستدعي هذه النقطة كلما انطلقت أو دورياً لتجديد ظهورها.""" | |
peers[req.url] = {"last_seen": time.time(), "load": req.load} | |
return {"status": "ok", "peers_count": len(peers)} | |
# ---- API للعمليات --------------------------------------------------------- | |
async def list_peers(): | |
"""يعيد قائمة بالعقد الصالحة بعد تنقية المتوقفة.""" | |
now = time.time() | |
# حذف العقد المتوقفة | |
for url, info in list(peers.items()): | |
if now - info["last_seen"] > HEARTBEAT_TTL: | |
peers.pop(url) | |
return list(peers.keys()) | |
async def dispatch_task(task: TaskRequest): | |
"""يتلقى مهمة ويعيد توجيهها لأفضل عقدة أو ينفذ محليّاً.""" | |
available = await list_peers() | |
if not available: | |
raise HTTPException(503, "لا توجد عقد متاحة حاليّاً") | |
# خوارزمية بسيطة: الاختيار بناءً على أقل تحميل معلن | |
# أو تدوير دائري إذا لم يعلن أحد عن تحميله | |
best = None | |
best_load = 1.1 | |
for url in available: | |
load = peers[url].get("load", None) | |
if load is None: | |
best = url | |
break | |
if load < best_load: | |
best, best_load = url, load | |
if not best: | |
best = available[0] | |
# إعادة توجيه الطلب | |
try: | |
resp = requests.post(best, json=task.dict(), timeout=10) | |
resp.raise_for_status() | |
return resp.json() | |
except Exception as e: | |
raise HTTPException(502, f"فشل التوجيه إلى {best}: {e}") | |
# ---- فحص دوري لصحة العقد --------------------------------------------------- | |
def health_check_loop(): | |
while True: | |
now = time.time() | |
for url in list(peers.keys()): | |
health_url = url.replace("/run", "/health") | |
try: | |
r = requests.get(health_url, timeout=3) | |
if r.status_code == 200: | |
peers[url]["last_seen"] = now | |
# يمكنك تحديث load من رد /health إذا وفّرته | |
else: | |
peers.pop(url) | |
except: | |
peers.pop(url) | |
time.sleep(HEALTH_CHECK_FREQ) | |
# ---- تشغيل الخلفيات وخادم FastAPI ------------------------------------------ | |
if __name__ == "__main__": | |
# شغل لوب الفحص الطبي في الخلفية | |
threading.Thread(target=health_check_loop, daemon=True).start() | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=1500) | |