code-backend / app.py
sixfingerdev's picture
Update app.py
9ffc7b3 verified
"""
SixFingerDev Arena - Multi-Model Agentic Backend
Supports: GPT-5.1-Codex, Claude Opus 4.5, o3, o3-mini
Hugging Face Spaces üzerinde çalışır
"""
from fastapi import FastAPI, HTTPException, Header, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel
from typing import List, Optional, Dict, Any, AsyncGenerator
import requests
import os
import secrets
from datetime import datetime
import time
import json
import asyncio
from collections import defaultdict
app = FastAPI(
title="SixFingerDev Arena Backend",
description="Multi-Model AI Backend with Task Routing",
version="2.0.0"
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================
# CONFIGURATION
# ============================================
# Model configurations
MODEL_CONFIGS = {
"gpt-5.1-codex": {
"provider": "pollinations",
"api_name": "openai",
"capabilities": ["code", "debug", "test"],
"max_tokens": 200000,
"temperature": 0.7
},
"claude-opus-4.5": {
"provider": "pollinations",
"api_name": "claude-opus-4",
"capabilities": ["plan", "modify", "test"],
"max_tokens": 200000,
"temperature": 0.7
},
"o3": {
"provider": "pollinations",
"api_name": "openai",
"capabilities": ["debug", "test"],
"max_tokens": 4096,
"temperature": 0.7
},
"o3-mini": {
"provider": "pollinations",
"api_name": "openai",
"capabilities": ["test"],
"max_tokens": 2048,
"temperature": 0.7
}
}
# Task to Model mapping (Optimized Elon Musk config)
TASK_MODELS = {
"Planner": "claude-opus-4.5", # Mimari vizyon + roadmap
"Coder": "gpt-5.1-codex", # State-of-the-art kod üretimi
"Tester": "claude-opus-4.5", # Farklı perspektif, edge case coverage
"Debugger": "gpt-5.1-codex", # Cybersecurity + vulnerability detection
"Modifier": "claude-opus-4.5" # Safe refactoring
}
# API Keys
API_KEYS_RAW = os.getenv('AI_API_KEYS', '')
BACKEND_API_KEY = os.getenv('BACKEND_API_KEY', secrets.token_urlsafe(32))
def parse_api_keys():
if not API_KEYS_RAW:
return []
if API_KEYS_RAW.startswith('['):
try:
return json.loads(API_KEYS_RAW)
except:
pass
return [k.strip() for k in API_KEYS_RAW.split(',') if k.strip()]
API_KEYS = parse_api_keys()
# Pollinations URL
POLLINATIONS_URL = "https://gen.pollinations.ai/v1/chat/completions"
# Rate limiting
REQUEST_COUNTS = defaultdict(list)
MAX_REQUESTS_PER_MINUTE = 60
# Session storage (in-memory)
SESSIONS = {}
# ============================================
# MODELS
# ============================================
class Message(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
session_id: str
message: str
model: Optional[str] = None # Auto-detect if None
task_type: Optional[str] = None # Planner, Coder, Tester, Debugger, Modifier
stream: Optional[bool] = True
temperature: Optional[float] = None
max_tokens: Optional[int] = None
class TaskDetectionRequest(BaseModel):
message: str
context: Optional[List[Message]] = []
class HealthResponse(BaseModel):
status: str
timestamp: str
available_models: List[str]
task_mappings: Dict[str, str]
version: str
# ============================================
# KEY MANAGER
# ============================================
class APIKeyManager:
def __init__(self, keys: List[str]):
self.keys = keys if keys else [None]
self.failed_keys = {}
self.current_index = 0
self.cooldown = 60
def get_working_key(self) -> Optional[str]:
if not self.keys or self.keys[0] is None:
return None
now = time.time()
attempts = 0
while attempts < len(self.keys):
key = self.keys[self.current_index]
if key in self.failed_keys:
if now - self.failed_keys[key] > self.cooldown:
del self.failed_keys[key]
else:
self.current_index = (self.current_index + 1) % len(self.keys)
attempts += 1
continue
return key
if self.failed_keys:
oldest_key = min(self.failed_keys, key=self.failed_keys.get)
del self.failed_keys[oldest_key]
return oldest_key
return self.keys[0] if self.keys else None
def mark_failed(self, key: Optional[str]):
if key:
self.failed_keys[key] = time.time()
self.current_index = (self.current_index + 1) % len(self.keys)
def mark_success(self, key: Optional[str]):
if key and key in self.failed_keys:
del self.failed_keys[key]
key_manager = APIKeyManager(API_KEYS)
# ============================================
# TASK DETECTION
# ============================================
def detect_task_type(message: str) -> str:
"""Mesajdan task type'ı otomatik tespit et"""
message_lower = message.lower()
# Keyword-based detection
if any(word in message_lower for word in ['plan', 'tasarla', 'mimari', 'architecture', 'design', 'roadmap']):
return "Planner"
elif any(word in message_lower for word in ['kod yaz', 'implement', 'create', 'build', 'develop', 'function']):
return "Coder"
elif any(word in message_lower for word in ['test', 'kontrol et', 'check', 'verify', 'validate']):
return "Tester"
elif any(word in message_lower for word in ['hata', 'bug', 'debug', 'fix', 'error', 'düzelt']):
return "Debugger"
elif any(word in message_lower for word in ['değiştir', 'modify', 'update', 'refactor', 'optimize']):
return "Modifier"
# Default: Genel sohbet için Coder
return "Coder"
# ============================================
# SESSION MANAGEMENT
# ============================================
def get_or_create_session(session_id: str) -> Dict:
"""Session al veya oluştur"""
if session_id not in SESSIONS:
SESSIONS[session_id] = {
"messages": [],
"created_at": datetime.utcnow().isoformat(),
"last_activity": datetime.utcnow().isoformat(),
"metadata": {}
}
SESSIONS[session_id]["last_activity"] = datetime.utcnow().isoformat()
return SESSIONS[session_id]
def add_message_to_session(session_id: str, role: str, content: str):
"""Session'a mesaj ekle"""
session = get_or_create_session(session_id)
session["messages"].append({
"role": role,
"content": content,
"timestamp": datetime.utcnow().isoformat()
})
# ============================================
# MIDDLEWARE
# ============================================
def verify_api_key(authorization: Optional[str] = Header(None)):
"""Backend API key doğrulama"""
if not authorization:
raise HTTPException(status_code=401, detail="Missing authorization header")
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid authorization format")
token = authorization.replace("Bearer ", "")
if token != BACKEND_API_KEY:
raise HTTPException(status_code=401, detail="Invalid API key")
return token
def rate_limit_check(client_id: str):
"""Rate limiting"""
now = time.time()
minute_ago = now - 60
REQUEST_COUNTS[client_id] = [
req_time for req_time in REQUEST_COUNTS[client_id]
if req_time > minute_ago
]
if len(REQUEST_COUNTS[client_id]) >= MAX_REQUESTS_PER_MINUTE:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
REQUEST_COUNTS[client_id].append(now)
# ============================================
# CORE API FUNCTIONS
# ============================================
async def call_model_api(
model: str,
messages: List[Dict],
stream: bool = False,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None
) -> Any:
"""Model API'sini çağır"""
if model not in MODEL_CONFIGS:
raise HTTPException(status_code=400, detail=f"Unknown model: {model}")
config = MODEL_CONFIGS[model]
api_key = key_manager.get_working_key()
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
payload = {
"model": config["api_name"],
"messages": messages,
"stream": stream,
"temperature": temperature or config["temperature"],
"max_tokens": max_tokens or config["max_tokens"]
}
max_retries = 3
last_error = None
for attempt in range(max_retries):
try:
response = requests.post(
POLLINATIONS_URL,
json=payload,
headers=headers,
stream=stream,
timeout=120
)
if response.status_code == 200:
key_manager.mark_success(api_key)
if stream:
return response
else:
return response.json()
elif response.status_code in [403, 429]:
key_manager.mark_failed(api_key)
api_key = key_manager.get_working_key()
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
await asyncio.sleep(1)
continue
elif response.status_code == 401:
key_manager.mark_failed(api_key)
api_key = key_manager.get_working_key()
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
continue
else:
last_error = f"API error: {response.status_code}"
if attempt < max_retries - 1:
await asyncio.sleep(2)
continue
raise HTTPException(status_code=response.status_code, detail=last_error)
except requests.exceptions.Timeout:
if attempt < max_retries - 1:
await asyncio.sleep(2)
continue
raise HTTPException(status_code=504, detail="Request timeout")
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
await asyncio.sleep(1)
continue
raise HTTPException(status_code=500, detail=f"Request failed: {str(e)}")
raise HTTPException(status_code=503, detail="All retries failed")
async def stream_generator(response) -> AsyncGenerator[str, None]:
"""Stream response'u SSE formatına çevir"""
try:
for line in response.iter_lines():
if line:
line_decoded = line.decode('utf-8')
if line_decoded.startswith("data: "):
data_str = line_decoded[6:]
if data_str.strip() == "[DONE]":
yield f"data: {json.dumps({'done': True})}\n\n"
break
try:
data = json.loads(data_str)
if "choices" in data and len(data["choices"]) > 0:
delta = data["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
yield f"data: {json.dumps({'chunk': content})}\n\n"
except json.JSONDecodeError:
continue
yield f"data: {json.dumps({'done': True})}\n\n"
finally:
response.close()
# ============================================
# ROUTES
# ============================================
@app.get("/", response_model=HealthResponse)
async def health_check():
"""Health check"""
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"available_models": list(MODEL_CONFIGS.keys()),
"task_mappings": TASK_MODELS,
"version": "2.0.0"
}
@app.post("/api/chat")
async def chat(request: ChatRequest):
"""
Ana chat endpoint - Task-based routing ile
Body:
{
"session_id": "sf_xxx",
"message": "Kayra için tokenizer yaz",
"model": "gpt-5.1-codex", // optional
"task_type": "Coder", // optional
"stream": true
}
"""
# Rate limiting
rate_limit_check(request.session_id)
# Session'ı al/oluştur
session = get_or_create_session(request.session_id)
# User mesajını ekle
add_message_to_session(request.session_id, "user", request.message)
# Task type'ı belirle
task_type = request.task_type or detect_task_type(request.message)
# Model'i belirle
model = request.model or TASK_MODELS.get(task_type, "gpt-5.1-codex")
# Messages'ı hazırla
messages = [
{"role": msg["role"], "content": msg["content"]}
for msg in session["messages"]
]
# System prompt ekle (task-specific)
system_prompts = {
"Planner": "Sen deneyimli bir yazılım mimarısın. Detaylı planlama ve tasarım yap.",
"Coder": "Sen expert bir kod geliştiricisisin. Temiz, okunabilir ve performanslı kod yaz.",
"Tester": "Sen titiz bir test mühendisisin. Comprehensive test senaryoları oluştur.",
"Debugger": "Sen yetenekli bir debugger'sın. Hataları kök nedenine inip çöz.",
"Modifier": "Sen dikkatli bir refactoring uzmanısın. Mevcut kodu bozmadan değiştir."
}
if task_type in system_prompts:
messages.insert(0, {"role": "system", "content": system_prompts[task_type]})
# API'yi çağır
if request.stream:
response = await call_model_api(
model=model,
messages=messages,
stream=True,
temperature=request.temperature,
max_tokens=request.max_tokens
)
return StreamingResponse(
stream_generator(response),
media_type="text/event-stream"
)
else:
result = await call_model_api(
model=model,
messages=messages,
stream=False,
temperature=request.temperature,
max_tokens=request.max_tokens
)
# Assistant yanıtını session'a ekle
assistant_message = result["choices"][0]["message"]["content"]
add_message_to_session(request.session_id, "assistant", assistant_message)
return result
@app.post("/api/detect-task")
async def detect_task(request: TaskDetectionRequest):
"""Task type'ı tespit et"""
task_type = detect_task_type(request.message)
recommended_model = TASK_MODELS.get(task_type)
return {
"task_type": task_type,
"recommended_model": recommended_model,
"model_config": MODEL_CONFIGS.get(recommended_model, {})
}
@app.get("/api/session/{session_id}")
async def get_session(session_id: str):
"""Session bilgilerini getir"""
if session_id not in SESSIONS:
raise HTTPException(status_code=404, detail="Session not found")
return SESSIONS[session_id]
@app.delete("/api/session/{session_id}")
async def delete_session(session_id: str):
"""Session'ı sil"""
if session_id in SESSIONS:
del SESSIONS[session_id]
return {"status": "deleted"}
raise HTTPException(status_code=404, detail="Session not found")
@app.get("/api/models")
async def list_models():
"""Kullanılabilir modelleri listele"""
return {
"models": MODEL_CONFIGS,
"task_mappings": TASK_MODELS
}
@app.get("/api/stats")
async def get_stats():
"""İstatistikler"""
return {
"total_sessions": len(SESSIONS),
"total_keys": len(API_KEYS) if API_KEYS and API_KEYS[0] is not None else 0,
"failed_keys": len(key_manager.failed_keys),
"active_clients": len(REQUEST_COUNTS),
"requests_last_minute": sum(len(v) for v in REQUEST_COUNTS.values()),
"available_models": list(MODEL_CONFIGS.keys())
}
# ============================================
# ERROR HANDLERS
# ============================================
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"message": exc.detail,
"type": "api_error",
"code": exc.status_code
}
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
print(f"Unhandled exception: {exc}")
return JSONResponse(
status_code=500,
content={
"error": {
"message": "Internal server error",
"type": "internal_error",
"code": 500
}
}
)
# ============================================
# STARTUP
# ============================================
@app.on_event("startup")
async def startup_event():
print("=" * 60)
print("🚀 SixFingerDev Arena Starting...")
print("=" * 60)
print(f"📦 Available Models: {', '.join(MODEL_CONFIGS.keys())}")
print(f"🎯 Task Mappings:")
for task, model in TASK_MODELS.items():
print(f" {task}: {model}")
print(f"🔑 API Keys: {len(API_KEYS) if API_KEYS and API_KEYS[0] is not None else 0}")
print(f"🔐 Backend Key: {BACKEND_API_KEY[:10]}...")
print(f"⏱️ Rate Limit: {MAX_REQUESTS_PER_MINUTE} req/min")
print("=" * 60)
print("✅ Arena Ready!")
print("=" * 60)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)