yuha / app.py
polarbearblue's picture
Create app.py
9c8461d verified
import json
import os
import re
import time
import uuid
import asyncio
import threading
from typing import Any, Dict, List, Optional, TypedDict, Union, Generator
import requests
from fastapi import FastAPI, HTTPException, Depends, Query
from fastapi.responses import StreamingResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
# Yupp Account Management
class YuppAccount(TypedDict):
token: str
is_valid: bool
last_used: float
error_count: int
# Global variables
VALID_CLIENT_KEYS: set = set()
YUPP_ACCOUNTS: List[YuppAccount] = []
YUPP_MODELS: List[Dict[str, Any]] = []
account_rotation_lock = threading.Lock()
MAX_ERROR_COUNT = 3
ERROR_COOLDOWN = 300 # 5 minutes cooldown for accounts with errors
DEBUG_MODE = os.environ.get("DEBUG_MODE", "false").lower() == "true"
# Pydantic Models
class ChatMessage(BaseModel):
role: str
content: Union[str, List[Dict[str, Any]]]
reasoning_content: Optional[str] = None
class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatMessage]
stream: bool = True
temperature: Optional[float] = None
max_tokens: Optional[int] = None
top_p: Optional[float] = None
raw_response: bool = False # 保留用于调试
class ModelInfo(BaseModel):
id: str
object: str = "model"
created: int
owned_by: str
class ModelList(BaseModel):
object: str = "list"
data: List[ModelInfo]
class ChatCompletionChoice(BaseModel):
message: ChatMessage
index: int = 0
finish_reason: str = "stop"
class ChatCompletionResponse(BaseModel):
id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}")
object: str = "chat.completion"
created: int = Field(default_factory=lambda: int(time.time()))
model: str
choices: List[ChatCompletionChoice]
usage: Dict[str, int] = Field(
default_factory=lambda: {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
}
)
class StreamChoice(BaseModel):
delta: Dict[str, Any] = Field(default_factory=dict)
index: int = 0
finish_reason: Optional[str] = None
class StreamResponse(BaseModel):
id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}")
object: str = "chat.completion.chunk"
created: int = Field(default_factory=lambda: int(time.time()))
model: str
choices: List[StreamChoice]
# FastAPI App
app = FastAPI(title="Yupp.ai OpenAI API Adapter")
security = HTTPBearer(auto_error=False)
def log_debug(message: str):
"""Debug日志函数"""
if DEBUG_MODE:
print(f"[DEBUG] {message}")
def load_client_api_keys():
"""Load client API keys from client_api_keys.json"""
global VALID_CLIENT_KEYS
try:
with open("client_api_keys.json", "r", encoding="utf-8") as f:
keys = json.load(f)
VALID_CLIENT_KEYS = set(keys) if isinstance(keys, list) else set()
print(f"Successfully loaded {len(VALID_CLIENT_KEYS)} client API keys.")
except FileNotFoundError:
print("Error: client_api_keys.json not found. Client authentication will fail.")
VALID_CLIENT_KEYS = set()
except Exception as e:
print(f"Error loading client_api_keys.json: {e}")
VALID_CLIENT_KEYS = set()
def load_yupp_accounts():
"""Load Yupp accounts from yupp.json"""
global YUPP_ACCOUNTS
YUPP_ACCOUNTS = []
try:
with open("yupp.json", "r", encoding="utf-8") as f:
accounts = json.load(f)
if not isinstance(accounts, list):
print("Warning: yupp.json should contain a list of account objects.")
return
for acc in accounts:
token = acc.get("token")
if token:
YUPP_ACCOUNTS.append({
"token": token,
"is_valid": True,
"last_used": 0,
"error_count": 0
})
print(f"Successfully loaded {len(YUPP_ACCOUNTS)} Yupp accounts.")
except FileNotFoundError:
print("Error: yupp.json not found. API calls will fail.")
except Exception as e:
print(f"Error loading yupp.json: {e}")
def load_yupp_models():
"""Load Yupp models from model.json"""
global YUPP_MODELS
try:
with open("model.json", "r", encoding="utf-8") as f:
YUPP_MODELS = json.load(f)
if not isinstance(YUPP_MODELS, list):
YUPP_MODELS = []
print("Warning: model.json should contain a list of model objects.")
return
print(f"Successfully loaded {len(YUPP_MODELS)} models.")
except FileNotFoundError:
print("Error: model.json not found. Model list will be empty.")
YUPP_MODELS = []
except Exception as e:
print(f"Error loading model.json: {e}")
YUPP_MODELS = []
def get_best_yupp_account() -> Optional[YuppAccount]:
"""Get the best available Yupp account using a smart selection algorithm."""
with account_rotation_lock:
now = time.time()
valid_accounts = [
acc for acc in YUPP_ACCOUNTS
if acc["is_valid"] and (
acc["error_count"] < MAX_ERROR_COUNT or
now - acc["last_used"] > ERROR_COOLDOWN
)
]
if not valid_accounts:
return None
# Reset error count for accounts that have been in cooldown
for acc in valid_accounts:
if acc["error_count"] >= MAX_ERROR_COUNT and now - acc["last_used"] > ERROR_COOLDOWN:
acc["error_count"] = 0
# Sort by last used (oldest first) and error count (lowest first)
valid_accounts.sort(key=lambda x: (x["last_used"], x["error_count"]))
account = valid_accounts[0]
account["last_used"] = now
return account
def format_messages_for_yupp(messages: List[ChatMessage]) -> str:
"""将多轮对话格式化为Yupp单轮对话格式"""
formatted = []
# 处理系统消息
system_messages = [msg for msg in messages if msg.role == "system"]
if system_messages:
for sys_msg in system_messages:
content = sys_msg.content if isinstance(sys_msg.content, str) else json.dumps(sys_msg.content)
formatted.append(content)
# 处理用户和助手消息
user_assistant_msgs = [msg for msg in messages if msg.role != "system"]
for msg in user_assistant_msgs:
role = "Human" if msg.role == "user" else "Assistant"
content = msg.content if isinstance(msg.content, str) else json.dumps(msg.content)
formatted.append(f"\n\n{role}: {content}")
# 确保以Assistant:结尾
if not formatted or not formatted[-1].strip().startswith("Assistant:"):
formatted.append("\n\nAssistant:")
result = "".join(formatted)
# 如果以\n\n开头,则删除
if result.startswith("\n\n"):
result = result[2:]
return result
async def authenticate_client(
auth: Optional[HTTPAuthorizationCredentials] = Depends(security),
):
"""Authenticate client based on API key in Authorization header"""
if not VALID_CLIENT_KEYS:
raise HTTPException(
status_code=503,
detail="Service unavailable: Client API keys not configured on server.",
)
if not auth or not auth.credentials:
raise HTTPException(
status_code=401,
detail="API key required in Authorization header.",
headers={"WWW-Authenticate": "Bearer"},
)
if auth.credentials not in VALID_CLIENT_KEYS:
raise HTTPException(status_code=403, detail="Invalid client API key.")
@app.on_event("startup")
async def startup():
"""应用启动时初始化配置"""
print("Starting Yupp.ai OpenAI API Adapter server...")
load_client_api_keys()
load_yupp_accounts()
load_yupp_models()
print("Server initialization completed.")
@app.on_event("shutdown")
async def shutdown():
"""应用关闭时清理资源"""
print("Server shutdown completed.")
def get_models_list_response() -> ModelList:
"""Helper to construct ModelList response from cached models."""
model_infos = [
ModelInfo(
id=model.get("label", "unknown"),
created=int(time.time()),
owned_by=model.get("publisher", "unknown")
)
for model in YUPP_MODELS
]
return ModelList(data=model_infos)
@app.get("/v1/models", response_model=ModelList)
async def list_v1_models(_: None = Depends(authenticate_client)):
"""List available models - authenticated"""
return get_models_list_response()
@app.get("/models", response_model=ModelList)
async def list_models_no_auth():
"""List available models without authentication - for client compatibility"""
return get_models_list_response()
@app.get("/debug")
async def toggle_debug(enable: bool = Query(None)):
"""切换调试模式"""
global DEBUG_MODE
if enable is not None:
DEBUG_MODE = enable
return {"debug_mode": DEBUG_MODE}
def claim_yupp_reward(account: YuppAccount, reward_id: str):
"""同步领取Yupp奖励"""
try:
log_debug(f"Claiming reward {reward_id}...")
url = "https://yupp.ai/api/trpc/reward.claim?batch=1"
payload = {"0": {"json": {"rewardId": reward_id}}}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0",
"Content-Type": "application/json",
"sec-fetch-site": "same-origin",
"Cookie": f"__Secure-yupp.session-token={account['token']}",
}
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
balance = data[0]["result"]["data"]["json"]["currentCreditBalance"]
print(f"Reward claimed successfully. New balance: {balance}")
return balance
except Exception as e:
print(f"Failed to claim reward {reward_id}. Error: {e}")
return None
def yupp_stream_generator(response_lines, model_id: str, account: YuppAccount) -> Generator[str, None, None]:
"""处理Yupp的流式响应并转换为OpenAI格式"""
stream_id = f"chatcmpl-{uuid.uuid4().hex}"
created_time = int(time.time())
# 发送初始角色
yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model_id, choices=[StreamChoice(delta={'role': 'assistant'})]).json()}\n\n"
line_pattern = re.compile(b"^([0-9a-fA-F]+):(.*)")
chunks = {}
target_stream_id = None
reward_info = None
is_thinking = False
thinking_content = ""
normal_content = ""
def extract_ref_id(ref):
"""从引用字符串中提取ID,例如从'$@123'提取'123'"""
return ref[2:] if ref and isinstance(ref, str) and ref.startswith("$@") else None
try:
for line in response_lines:
if not line:
continue
match = line_pattern.match(line)
if not match:
continue
chunk_id, chunk_data = match.groups()
chunk_id = chunk_id.decode()
try:
data = json.loads(chunk_data) if chunk_data != b"{}" else {}
chunks[chunk_id] = data
except json.JSONDecodeError:
continue
# 处理奖励信息
if chunk_id == "a":
reward_info = data
# 处理初始设置信息
elif chunk_id == "1":
left_stream = data.get("leftStream", {})
right_stream = data.get("rightStream", {})
select_stream = [left_stream, right_stream]
elif chunk_id == "e":
for i, selection in enumerate(data.get("modelSelections", [])):
if selection.get("selectionSource") == "USER_SELECTED":
target_stream_id = extract_ref_id(select_stream[i].get("next"))
break
# 处理目标流内容
elif target_stream_id and chunk_id == target_stream_id:
content = data.get("curr", "")
if content:
# 处理思考过程
if "<think>" in content:
parts = content.split("<think>", 1)
if parts[0]: # 思考标签前的内容
normal_content += parts[0]
yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model_id, choices=[StreamChoice(delta={'content': parts[0]})]).json()}\n\n"
is_thinking = True
thinking_part = parts[1]
if "</think>" in thinking_part:
think_parts = thinking_part.split("</think>", 1)
thinking_content += think_parts[0]
yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model_id, choices=[StreamChoice(delta={'reasoning_content': think_parts[0]})]).json()}\n\n"
is_thinking = False
if think_parts[1]: # 思考标签后的内容
normal_content += think_parts[1]
yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model_id, choices=[StreamChoice(delta={'content': think_parts[1]})]).json()}\n\n"
else:
thinking_content += thinking_part
yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model_id, choices=[StreamChoice(delta={'reasoning_content': thinking_part})]).json()}\n\n"
elif "</think>" in content and is_thinking:
parts = content.split("</think>", 1)
thinking_content += parts[0]
yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model_id, choices=[StreamChoice(delta={'reasoning_content': parts[0]})]).json()}\n\n"
is_thinking = False
if parts[1]: # 思考标签后的内容
normal_content += parts[1]
yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model_id, choices=[StreamChoice(delta={'content': parts[1]})]).json()}\n\n"
elif is_thinking:
thinking_content += content
yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model_id, choices=[StreamChoice(delta={'reasoning_content': content})]).json()}\n\n"
else:
normal_content += content
yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model_id, choices=[StreamChoice(delta={'content': content})]).json()}\n\n"
# 更新目标流ID
target_stream_id = extract_ref_id(data.get("next"))
except Exception as e:
print(f"Stream processing error: {e}")
yield f"data: {json.dumps({'error': str(e)})}\n\n"
finally:
# 发送完成信号
yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model_id, choices=[StreamChoice(delta={}, finish_reason='stop')]).json()}\n\n"
yield "data: [DONE]\n\n"
# 领取奖励
if reward_info and "unclaimedRewardInfo" in reward_info:
reward_id = reward_info["unclaimedRewardInfo"].get("rewardId")
if reward_id:
try:
claim_yupp_reward(account, reward_id)
except Exception as e:
print(f"Failed to claim reward in background: {e}")
def build_yupp_non_stream_response(response_lines, model_id: str, account: YuppAccount) -> ChatCompletionResponse:
"""构建非流式响应"""
full_content = ""
full_reasoning_content = ""
reward_id = None
for event in yupp_stream_generator(response_lines, model_id, account):
if event.startswith("data:"):
data_str = event[5:].strip()
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
if "error" in data:
raise HTTPException(status_code=500, detail=data["error"]["message"])
delta = data.get("choices", [{}])[0].get("delta", {})
if "content" in delta:
full_content += delta["content"]
if "reasoning_content" in delta:
full_reasoning_content += delta["reasoning_content"]
except json.JSONDecodeError:
continue
# 构建完整响应
return ChatCompletionResponse(
model=model_id,
choices=[
ChatCompletionChoice(
message=ChatMessage(
role="assistant",
content=full_content,
reasoning_content=full_reasoning_content if full_reasoning_content else None,
)
)
],
)
@app.post("/v1/chat/completions")
async def chat_completions(
request: ChatCompletionRequest, _: None = Depends(authenticate_client)
):
"""使用Yupp.ai创建聊天完成"""
# 查找模型
model_info = next((m for m in YUPP_MODELS if m.get("label") == request.model), None)
if not model_info:
raise HTTPException(status_code=404, detail=f"Model '{request.model}' not found.")
model_name = model_info.get("name")
if not model_name:
raise HTTPException(status_code=404, detail=f"Model '{request.model}' has no 'name' field.")
if not request.messages:
raise HTTPException(status_code=400, detail="No messages provided in the request.")
log_debug(f"Processing request for model: {request.model} (Yupp name: {model_name})")
# 格式化消息
question = format_messages_for_yupp(request.messages)
log_debug(f"Formatted question: {question[:100]}...")
# 尝试所有账户
for attempt in range(len(YUPP_ACCOUNTS)):
account = get_best_yupp_account()
if not account:
raise HTTPException(
status_code=503,
detail="No valid Yupp.ai accounts available."
)
try:
# 构建请求
url_uuid = str(uuid.uuid4())
url = f"https://yupp.ai/chat/{url_uuid}"
payload = [
url_uuid,
str(uuid.uuid4()),
question,
"$undefined",
"$undefined",
[],
"$undefined",
[{"modelName": model_name, "promptModifierId": "$undefined"}],
"text",
False,
]
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0",
"Accept": "text/x-component",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Content-Type": "application/json",
"next-action": "7f48888536e2f0c0163640837db291777c39cc40c3",
"sec-fetch-site": "same-origin",
"Cookie": f"__Secure-yupp.session-token={account['token']}",
}
log_debug(f"Sending request to Yupp.ai with account token ending in ...{account['token'][-4:]}")
# 发送请求
response = requests.post(
url,
data=json.dumps(payload),
headers=headers,
stream=True
)
response.raise_for_status()
# 处理响应
if request.stream:
log_debug("Returning processed response stream")
return StreamingResponse(
yupp_stream_generator(response.iter_lines(), request.model, account),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
else:
log_debug("Building non-stream response")
return build_yupp_non_stream_response(response.iter_lines(), model_name, account)
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
error_detail = e.response.text
print(f"Yupp.ai API error ({status_code}): {error_detail}")
with account_rotation_lock:
if status_code in [401, 403]:
account["is_valid"] = False
print(f"Account ...{account['token'][-4:]} marked as invalid due to auth error.")
elif status_code in [429, 500, 502, 503, 504]:
account["error_count"] += 1
print(f"Account ...{account['token'][-4:]} error count: {account['error_count']}")
else:
# 客户端错误,不尝试使用其他账户
raise HTTPException(status_code=status_code, detail=error_detail)
except Exception as e:
print(f"Request error: {e}")
with account_rotation_lock:
account["error_count"] += 1
# 所有尝试都失败
raise HTTPException(status_code=503, detail="All attempts to contact Yupp.ai API failed.")
async def error_stream_generator(error_detail: str, status_code: int):
"""生成错误流响应"""
yield f'data: {json.dumps({"error": {"message": error_detail, "type": "yupp_api_error", "code": status_code}})}\n\n'
yield "data: [DONE]\n\n"
if __name__ == "__main__":
import uvicorn
# 设置环境变量以启用调试模式
if os.environ.get("DEBUG_MODE", "").lower() == "true":
DEBUG_MODE = True
print("Debug mode enabled via environment variable")
if not os.path.exists("yupp.json"):
print("Warning: yupp.json not found. Creating a dummy file.")
dummy_data = [
{
"token": "your_yupp_session_token_here",
}
]
with open("yupp.json", "w", encoding="utf-8") as f:
json.dump(dummy_data, f, indent=4)
print("Created dummy yupp.json. Please replace with valid Yupp.ai data.")
if not os.path.exists("client_api_keys.json"):
print("Warning: client_api_keys.json not found. Creating a dummy file.")
dummy_key = f"sk-dummy-{uuid.uuid4().hex}"
with open("client_api_keys.json", "w", encoding="utf-8") as f:
json.dump([dummy_key], f, indent=2)
print(f"Created dummy client_api_keys.json with key: {dummy_key}")
if not os.path.exists("model.json"):
print("Warning: model.json not found. Creating a dummy file.")
dummy_models = [
{
"id": "claude-3.7-sonnet:thinking",
"name": "anthropic/claude-3.7-sonnet:thinking<>OPR",
"label": "Claude 3.7 Sonnet (Thinking) (OpenRouter)",
"publisher": "Anthropic",
"family": "Claude"
}
]
with open("model.json", "w", encoding="utf-8") as f:
json.dump(dummy_models, f, indent=4)
print("Created dummy model.json.")
load_client_api_keys()
load_yupp_accounts()
load_yupp_models()
print("\n--- Yupp.ai OpenAI API Adapter ---")
print(f"Debug Mode: {DEBUG_MODE}")
print("Endpoints:")
print(" GET /v1/models (Client API Key Auth)")
print(" GET /models (No Auth)")
print(" POST /v1/chat/completions (Client API Key Auth)")
print(" GET /debug?enable=[true|false] (Toggle Debug Mode)")
print(f"\nClient API Keys: {len(VALID_CLIENT_KEYS)}")
if YUPP_ACCOUNTS:
print(f"Yupp.ai Accounts: {len(YUPP_ACCOUNTS)}")
else:
print("Yupp.ai Accounts: None loaded. Check yupp.json.")
if YUPP_MODELS:
models = sorted([m.get("label", m.get("id", "unknown")) for m in YUPP_MODELS])
print(f"Yupp.ai Models: {len(YUPP_MODELS)}")
print(f"Available models: {', '.join(models[:5])}{'...' if len(models) > 5 else ''}")
else:
print("Yupp.ai Models: None loaded. Check model.json.")
print("------------------------------------")
uvicorn.run(app, host="0.0.0.0", port=8000)