|
import json |
|
import logging |
|
import asyncio |
|
from flask import Flask, request, Response, stream_with_context, jsonify |
|
import httpx |
|
import time |
|
from dotenv import load_dotenv |
|
import os |
|
import ast |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
logging.getLogger("httpx").setLevel(logging.WARNING) |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
VALID_API_KEYS = [key.strip() for key in os.getenv("VALID_API_KEYS", "").split(",") if key] |
|
|
|
|
|
|
|
|
|
CONVERSATION_MEMORY_MODE = int(os.getenv('CONVERSATION_MEMORY_MODE', '1')) |
|
|
|
class DifyModelManager: |
|
def __init__(self): |
|
self.api_keys = [] |
|
self.name_to_api_key = {} |
|
self.api_key_to_name = {} |
|
self.load_api_keys() |
|
|
|
def load_api_keys(self): |
|
"""从环境变量加载API Keys""" |
|
api_keys_str = os.getenv('DIFY_API_KEYS', '') |
|
if api_keys_str: |
|
self.api_keys = [key.strip() for key in api_keys_str.split(',') if key.strip()] |
|
logger.info(f"Loaded {len(self.api_keys)} API keys") |
|
|
|
async def fetch_app_info(self, api_key): |
|
"""获取Dify应用信息""" |
|
try: |
|
async with httpx.AsyncClient() as client: |
|
headers = { |
|
"Authorization": f"Bearer {api_key}", |
|
"Content-Type": "application/json" |
|
} |
|
response = await client.get( |
|
f"{DIFY_API_BASE}/info", |
|
headers=headers, |
|
params={"user": "default_user"} |
|
) |
|
|
|
if response.status_code == 200: |
|
app_info = response.json() |
|
return app_info.get("name", "Unknown App") |
|
else: |
|
logger.error(f"Failed to fetch app info for API key: {api_key[:8]}...") |
|
return None |
|
except Exception as e: |
|
logger.error(f"Error fetching app info: {str(e)}") |
|
return None |
|
|
|
async def refresh_model_info(self): |
|
"""刷新所有应用信息""" |
|
self.name_to_api_key.clear() |
|
self.api_key_to_name.clear() |
|
|
|
for api_key in self.api_keys: |
|
app_name = await self.fetch_app_info(api_key) |
|
if app_name: |
|
self.name_to_api_key[app_name] = api_key |
|
self.api_key_to_name[api_key] = app_name |
|
logger.info(f"Mapped app '{app_name}' to API key: {api_key[:8]}...") |
|
|
|
def get_api_key(self, model_name): |
|
"""根据模型名称获取API Key""" |
|
return self.name_to_api_key.get(model_name) |
|
|
|
def get_available_models(self): |
|
"""获取可用模型列表""" |
|
return [ |
|
{ |
|
"id": name, |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "dify" |
|
} |
|
for name in self.name_to_api_key.keys() |
|
] |
|
|
|
|
|
model_manager = DifyModelManager() |
|
|
|
|
|
DIFY_API_BASE = os.getenv("DIFY_API_BASE", "") |
|
|
|
app = Flask(__name__) |
|
|
|
def get_api_key(model_name): |
|
"""根据模型名称获取对应的API密钥""" |
|
api_key = model_manager.get_api_key(model_name) |
|
if not api_key: |
|
logger.warning(f"No API key found for model: {model_name}") |
|
return api_key |
|
|
|
def transform_openai_to_dify(openai_request, endpoint): |
|
"""将OpenAI格式的请求转换为Dify格式""" |
|
|
|
if endpoint == "/chat/completions": |
|
messages = openai_request.get("messages", []) |
|
stream = openai_request.get("stream", False) |
|
|
|
|
|
conversation_id = None |
|
|
|
|
|
system_content = "" |
|
system_messages = [msg for msg in messages if msg.get("role") == "system"] |
|
if system_messages: |
|
system_content = system_messages[0].get("content", "") |
|
|
|
logger.info(f"Found system message: {system_content[:100]}{'...' if len(system_content) > 100 else ''}") |
|
|
|
if CONVERSATION_MEMORY_MODE == 2: |
|
if len(messages) > 1: |
|
|
|
for msg in reversed(messages[:-1]): |
|
if msg.get("role") == "assistant": |
|
content = msg.get("content", "") |
|
|
|
conversation_id = decode_conversation_id(content) |
|
if conversation_id: |
|
break |
|
|
|
|
|
user_query = messages[-1]["content"] if messages and messages[-1].get("role") != "system" else "" |
|
|
|
|
|
if system_content and not conversation_id: |
|
user_query = f"系统指令: {system_content}\n\n用户问题: {user_query}" |
|
logger.info(f"[零宽字符模式] 首次对话,添加system内容到查询前") |
|
|
|
dify_request = { |
|
"inputs": {}, |
|
"query": user_query, |
|
"response_mode": "streaming" if stream else "blocking", |
|
"conversation_id": conversation_id, |
|
"user": openai_request.get("user", "default_user") |
|
} |
|
else: |
|
|
|
user_query = messages[-1]["content"] if messages and messages[-1].get("role") != "system" else "" |
|
|
|
|
|
if len(messages) > 1: |
|
history_messages = [] |
|
has_system_in_history = False |
|
|
|
|
|
for msg in messages[:-1]: |
|
role = msg.get("role", "") |
|
content = msg.get("content", "") |
|
if role and content: |
|
if role == "system": |
|
has_system_in_history = True |
|
history_messages.append(f"{role}: {content}") |
|
|
|
|
|
if system_content and not has_system_in_history: |
|
history_messages.insert(0, f"system: {system_content}") |
|
logger.info(f"[history_message模式] 添加system内容到历史消息前") |
|
|
|
|
|
if history_messages: |
|
history_context = "\n\n".join(history_messages) |
|
user_query = f"<history>\n{history_context}\n</history>\n\n用户当前问题: {user_query}" |
|
elif system_content: |
|
user_query = f"系统指令: {system_content}\n\n用户问题: {user_query}" |
|
logger.info(f"[history_message模式] 首次对话,添加system内容到查询前") |
|
|
|
dify_request = { |
|
"inputs": {}, |
|
"query": user_query, |
|
"response_mode": "streaming" if stream else "blocking", |
|
"user": openai_request.get("user", "default_user") |
|
} |
|
|
|
return dify_request |
|
|
|
return None |
|
|
|
def transform_dify_to_openai(dify_response, model="claude-3-5-sonnet-v2", stream=False): |
|
"""将Dify格式的响应转换为OpenAI格式""" |
|
|
|
if not stream: |
|
|
|
answer = "" |
|
mode = dify_response.get("mode", "") |
|
|
|
|
|
if "answer" in dify_response: |
|
answer = dify_response.get("answer", "") |
|
|
|
|
|
elif "agent_thoughts" in dify_response: |
|
|
|
agent_thoughts = dify_response.get("agent_thoughts", []) |
|
if agent_thoughts: |
|
for thought in agent_thoughts: |
|
if thought.get("thought"): |
|
answer = thought.get("thought", "") |
|
|
|
|
|
if CONVERSATION_MEMORY_MODE == 2: |
|
conversation_id = dify_response.get("conversation_id", "") |
|
history = dify_response.get("conversation_history", []) |
|
|
|
|
|
has_conversation_id = False |
|
if history: |
|
for msg in history: |
|
if msg.get("role") == "assistant": |
|
content = msg.get("content", "") |
|
if decode_conversation_id(content) is not None: |
|
has_conversation_id = True |
|
break |
|
|
|
|
|
if conversation_id and not has_conversation_id: |
|
logger.info(f"[Debug] Inserting conversation_id: {conversation_id}, history_length: {len(history)}") |
|
encoded = encode_conversation_id(conversation_id) |
|
answer = answer + encoded |
|
logger.info(f"[Debug] Response content after insertion: {repr(answer)}") |
|
|
|
return { |
|
"id": dify_response.get("message_id", ""), |
|
"object": "chat.completion", |
|
"created": dify_response.get("created", int(time.time())), |
|
"model": model, |
|
"choices": [{ |
|
"index": 0, |
|
"message": { |
|
"role": "assistant", |
|
"content": answer |
|
}, |
|
"finish_reason": "stop" |
|
}] |
|
} |
|
else: |
|
|
|
return dify_response |
|
|
|
def create_openai_stream_response(content, message_id, model="claude-3-5-sonnet-v2"): |
|
"""创建OpenAI格式的流式响应""" |
|
return { |
|
"id": message_id, |
|
"object": "chat.completion.chunk", |
|
"created": int(time.time()), |
|
"model": model, |
|
"choices": [{ |
|
"index": 0, |
|
"delta": { |
|
"content": content |
|
}, |
|
"finish_reason": None |
|
}] |
|
} |
|
|
|
def encode_conversation_id(conversation_id): |
|
"""将conversation_id编码为不可见的字符序列""" |
|
if not conversation_id: |
|
return "" |
|
|
|
|
|
import base64 |
|
encoded = base64.b64encode(conversation_id.encode()).decode() |
|
|
|
|
|
|
|
char_map = { |
|
'0': '\u200b', |
|
'1': '\u200c', |
|
'2': '\u200d', |
|
'3': '\ufeff', |
|
'4': '\u2060', |
|
'5': '\u180e', |
|
'6': '\u2061', |
|
'7': '\u2062', |
|
} |
|
|
|
|
|
result = [] |
|
for c in encoded: |
|
|
|
if c.isalpha(): |
|
if c.isupper(): |
|
val = ord(c) - ord('A') |
|
else: |
|
val = ord(c) - ord('a') + 26 |
|
elif c.isdigit(): |
|
val = int(c) + 52 |
|
elif c == '+': |
|
val = 62 |
|
elif c == '/': |
|
val = 63 |
|
else: |
|
val = 0 |
|
|
|
|
|
first = (val >> 3) & 0x7 |
|
second = val & 0x7 |
|
result.append(char_map[str(first)]) |
|
if c != '=': |
|
result.append(char_map[str(second)]) |
|
|
|
return ''.join(result) |
|
|
|
def decode_conversation_id(content): |
|
"""从消息内容中解码conversation_id""" |
|
try: |
|
|
|
char_to_val = { |
|
'\u200b': '0', |
|
'\u200c': '1', |
|
'\u200d': '2', |
|
'\ufeff': '3', |
|
'\u2060': '4', |
|
'\u180e': '5', |
|
'\u2061': '6', |
|
'\u2062': '7', |
|
} |
|
|
|
|
|
space_chars = [] |
|
for c in reversed(content): |
|
if c not in char_to_val: |
|
break |
|
space_chars.append(c) |
|
|
|
if not space_chars: |
|
return None |
|
|
|
|
|
space_chars.reverse() |
|
base64_chars = [] |
|
for i in range(0, len(space_chars), 2): |
|
first = int(char_to_val[space_chars[i]], 8) |
|
if i + 1 < len(space_chars): |
|
second = int(char_to_val[space_chars[i + 1]], 8) |
|
val = (first << 3) | second |
|
else: |
|
val = first << 3 |
|
|
|
|
|
if val < 26: |
|
base64_chars.append(chr(val + ord('A'))) |
|
elif val < 52: |
|
base64_chars.append(chr(val - 26 + ord('a'))) |
|
elif val < 62: |
|
base64_chars.append(str(val - 52)) |
|
elif val == 62: |
|
base64_chars.append('+') |
|
else: |
|
base64_chars.append('/') |
|
|
|
|
|
padding = len(base64_chars) % 4 |
|
if padding: |
|
base64_chars.extend(['='] * (4 - padding)) |
|
|
|
|
|
import base64 |
|
base64_str = ''.join(base64_chars) |
|
return base64.b64decode(base64_str).decode() |
|
|
|
except Exception as e: |
|
logger.debug(f"Failed to decode conversation_id: {e}") |
|
return None |
|
|
|
@app.route('/v1/chat/completions', methods=['POST']) |
|
def chat_completions(): |
|
try: |
|
|
|
auth_header = request.headers.get('Authorization') |
|
if not auth_header: |
|
return jsonify({ |
|
"error": { |
|
"message": "Missing Authorization header", |
|
"type": "invalid_request_error", |
|
"param": None, |
|
"code": "invalid_api_key" |
|
} |
|
}), 401 |
|
|
|
parts = auth_header.split() |
|
if len(parts) != 2 or parts[0].lower() != 'bearer': |
|
return jsonify({ |
|
"error": { |
|
"message": "Invalid Authorization header format. Expected: Bearer <API_KEY>", |
|
"type": "invalid_request_error", |
|
"param": None, |
|
"code": "invalid_api_key" |
|
} |
|
}), 401 |
|
|
|
provided_api_key = parts[1] |
|
if provided_api_key not in VALID_API_KEYS: |
|
return jsonify({ |
|
"error": { |
|
"message": "Invalid API key", |
|
"type": "invalid_request_error", |
|
"param": None, |
|
"code": "invalid_api_key" |
|
} |
|
}), 401 |
|
|
|
|
|
openai_request = request.get_json() |
|
logger.info(f"Received request: {json.dumps(openai_request, ensure_ascii=False)}") |
|
|
|
model = openai_request.get("model", "claude-3-5-sonnet-v2") |
|
|
|
|
|
api_key = get_api_key(model) |
|
if not api_key: |
|
error_msg = f"Model {model} is not supported. Available models: {', '.join(model_manager.name_to_api_key.keys())}" |
|
logger.error(error_msg) |
|
return { |
|
"error": { |
|
"message": error_msg, |
|
"type": "invalid_request_error", |
|
"code": "model_not_found" |
|
} |
|
}, 404 |
|
|
|
dify_request = transform_openai_to_dify(openai_request, "/chat/completions") |
|
|
|
if not dify_request: |
|
logger.error("Failed to transform request") |
|
return { |
|
"error": { |
|
"message": "Invalid request format", |
|
"type": "invalid_request_error", |
|
} |
|
}, 400 |
|
|
|
headers = { |
|
"Authorization": f"Bearer {api_key}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
stream = openai_request.get("stream", False) |
|
dify_endpoint = f"{DIFY_API_BASE}/chat-messages" |
|
logger.info(f"Sending request to Dify endpoint: {dify_endpoint}, stream={stream}") |
|
|
|
if stream: |
|
def generate(): |
|
client = httpx.Client(timeout=None) |
|
|
|
def flush_chunk(chunk_data): |
|
"""Helper function to flush chunks immediately""" |
|
return chunk_data.encode('utf-8') |
|
|
|
def calculate_delay(buffer_size): |
|
""" |
|
根据缓冲区大小动态计算延迟 |
|
buffer_size: 缓冲区中剩余的字符数量 |
|
""" |
|
if buffer_size > 30: |
|
return 0.001 |
|
elif buffer_size > 20: |
|
return 0.002 |
|
elif buffer_size > 10: |
|
return 0.01 |
|
else: |
|
return 0.02 |
|
|
|
def send_char(char, message_id): |
|
"""Helper function to send single character""" |
|
openai_chunk = { |
|
"id": message_id, |
|
"object": "chat.completion.chunk", |
|
"created": int(time.time()), |
|
"model": model, |
|
"choices": [{ |
|
"index": 0, |
|
"delta": { |
|
"content": char |
|
}, |
|
"finish_reason": None |
|
}] |
|
} |
|
chunk_data = f"data: {json.dumps(openai_chunk)}\n\n" |
|
return flush_chunk(chunk_data) |
|
|
|
|
|
output_buffer = [] |
|
|
|
try: |
|
with client.stream( |
|
'POST', |
|
dify_endpoint, |
|
json=dify_request, |
|
headers={ |
|
**headers, |
|
'Accept': 'text/event-stream', |
|
'Cache-Control': 'no-cache', |
|
'Connection': 'keep-alive' |
|
} |
|
) as response: |
|
generate.message_id = None |
|
buffer = "" |
|
|
|
for raw_bytes in response.iter_raw(): |
|
if not raw_bytes: |
|
continue |
|
|
|
try: |
|
buffer += raw_bytes.decode('utf-8') |
|
|
|
while '\n' in buffer: |
|
line, buffer = buffer.split('\n', 1) |
|
line = line.strip() |
|
|
|
if not line or not line.startswith('data: '): |
|
continue |
|
|
|
try: |
|
json_str = line[6:] |
|
dify_chunk = json.loads(json_str) |
|
|
|
if dify_chunk.get("event") == "message" and "answer" in dify_chunk: |
|
current_answer = dify_chunk["answer"] |
|
if not current_answer: |
|
continue |
|
|
|
message_id = dify_chunk.get("message_id", "") |
|
if not generate.message_id: |
|
generate.message_id = message_id |
|
|
|
|
|
for char in current_answer: |
|
output_buffer.append((char, generate.message_id)) |
|
|
|
|
|
while output_buffer: |
|
char, msg_id = output_buffer.pop(0) |
|
yield send_char(char, msg_id) |
|
|
|
delay = calculate_delay(len(output_buffer)) |
|
time.sleep(delay) |
|
|
|
|
|
continue |
|
|
|
|
|
elif dify_chunk.get("event") == "agent_message" and "answer" in dify_chunk: |
|
current_answer = dify_chunk["answer"] |
|
if not current_answer: |
|
continue |
|
|
|
message_id = dify_chunk.get("message_id", "") |
|
if not generate.message_id: |
|
generate.message_id = message_id |
|
|
|
|
|
for char in current_answer: |
|
output_buffer.append((char, generate.message_id)) |
|
|
|
|
|
while output_buffer: |
|
char, msg_id = output_buffer.pop(0) |
|
yield send_char(char, msg_id) |
|
|
|
delay = calculate_delay(len(output_buffer)) |
|
time.sleep(delay) |
|
|
|
|
|
continue |
|
|
|
|
|
elif dify_chunk.get("event") == "agent_thought": |
|
thought_id = dify_chunk.get("id", "") |
|
thought = dify_chunk.get("thought", "") |
|
tool = dify_chunk.get("tool", "") |
|
tool_input = dify_chunk.get("tool_input", "") |
|
observation = dify_chunk.get("observation", "") |
|
|
|
logger.info(f"[Agent Thought] ID: {thought_id}, Tool: {tool}") |
|
if thought: |
|
logger.info(f"[Agent Thought] Thought: {thought}") |
|
if tool_input: |
|
logger.info(f"[Agent Thought] Tool Input: {tool_input}") |
|
if observation: |
|
logger.info(f"[Agent Thought] Observation: {observation}") |
|
|
|
|
|
message_id = dify_chunk.get("message_id", "") |
|
if not generate.message_id and message_id: |
|
generate.message_id = message_id |
|
|
|
continue |
|
|
|
|
|
elif dify_chunk.get("event") == "message_file": |
|
file_id = dify_chunk.get("id", "") |
|
file_type = dify_chunk.get("type", "") |
|
file_url = dify_chunk.get("url", "") |
|
|
|
logger.info(f"[Message File] ID: {file_id}, Type: {file_type}, URL: {file_url}") |
|
continue |
|
|
|
elif dify_chunk.get("event") == "message_end": |
|
|
|
while output_buffer: |
|
char, msg_id = output_buffer.pop(0) |
|
yield send_char(char, msg_id) |
|
time.sleep(0.001) |
|
|
|
|
|
if CONVERSATION_MEMORY_MODE == 2: |
|
conversation_id = dify_chunk.get("conversation_id") |
|
history = dify_chunk.get("conversation_history", []) |
|
|
|
has_conversation_id = False |
|
if history: |
|
for msg in history: |
|
if msg.get("role") == "assistant": |
|
content = msg.get("content", "") |
|
if decode_conversation_id(content) is not None: |
|
has_conversation_id = True |
|
break |
|
|
|
|
|
if conversation_id and not has_conversation_id: |
|
logger.info(f"[Debug] Inserting conversation_id in stream: {conversation_id}") |
|
encoded = encode_conversation_id(conversation_id) |
|
logger.info(f"[Debug] Stream encoded content: {repr(encoded)}") |
|
for char in encoded: |
|
yield send_char(char, generate.message_id) |
|
|
|
final_chunk = { |
|
"id": generate.message_id, |
|
"object": "chat.completion.chunk", |
|
"created": int(time.time()), |
|
"model": model, |
|
"choices": [{ |
|
"index": 0, |
|
"delta": {}, |
|
"finish_reason": "stop" |
|
}] |
|
} |
|
yield flush_chunk(f"data: {json.dumps(final_chunk)}\n\n") |
|
yield flush_chunk("data: [DONE]\n\n") |
|
|
|
except json.JSONDecodeError as e: |
|
logger.error(f"JSON decode error: {str(e)}") |
|
continue |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing chunk: {str(e)}") |
|
continue |
|
|
|
finally: |
|
client.close() |
|
|
|
return Response( |
|
stream_with_context(generate()), |
|
content_type='text/event-stream', |
|
headers={ |
|
'Cache-Control': 'no-cache, no-transform', |
|
'Connection': 'keep-alive', |
|
'Transfer-Encoding': 'chunked', |
|
'X-Accel-Buffering': 'no', |
|
'Content-Encoding': 'none' |
|
}, |
|
direct_passthrough=True |
|
) |
|
else: |
|
async def sync_response(): |
|
try: |
|
async with httpx.AsyncClient() as client: |
|
response = await client.post( |
|
dify_endpoint, |
|
json=dify_request, |
|
headers=headers |
|
) |
|
|
|
if response.status_code != 200: |
|
error_msg = f"Dify API error: {response.text}" |
|
logger.error(f"Request failed: {error_msg}") |
|
return { |
|
"error": { |
|
"message": error_msg, |
|
"type": "api_error", |
|
"code": response.status_code |
|
} |
|
}, response.status_code |
|
|
|
dify_response = response.json() |
|
logger.info(f"Received response from Dify: {json.dumps(dify_response, ensure_ascii=False)}") |
|
logger.info(f"[Debug] Response content: {repr(dify_response.get('answer', ''))}") |
|
openai_response = transform_dify_to_openai(dify_response, model=model) |
|
conversation_id = dify_response.get("conversation_id") |
|
if conversation_id: |
|
|
|
return Response( |
|
json.dumps(openai_response), |
|
content_type='application/json', |
|
headers={ |
|
'Conversation-Id': conversation_id |
|
} |
|
) |
|
else: |
|
return openai_response |
|
except httpx.RequestError as e: |
|
error_msg = f"Failed to connect to Dify: {str(e)}" |
|
logger.error(error_msg) |
|
return { |
|
"error": { |
|
"message": error_msg, |
|
"type": "api_error", |
|
"code": "connection_error" |
|
} |
|
}, 503 |
|
|
|
return asyncio.run(sync_response()) |
|
|
|
except Exception as e: |
|
logger.exception("Unexpected error occurred") |
|
return { |
|
"error": { |
|
"message": str(e), |
|
"type": "internal_error", |
|
} |
|
}, 500 |
|
|
|
@app.route('/v1/models', methods=['GET']) |
|
def list_models(): |
|
"""返回可用的模型列表""" |
|
logger.info("Listing available models") |
|
|
|
|
|
asyncio.run(model_manager.refresh_model_info()) |
|
|
|
|
|
available_models = model_manager.get_available_models() |
|
|
|
response = { |
|
"object": "list", |
|
"data": available_models |
|
} |
|
logger.info(f"Available models: {json.dumps(response, ensure_ascii=False)}") |
|
return response |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
asyncio.run(model_manager.refresh_model_info()) |
|
|
|
|
|
host = os.getenv("SERVER_HOST", "0.0.0.0") |
|
port = int(os.getenv("SERVER_PORT", 7860)) |
|
|
|
logger.info(f"Starting server on {host}:{port}") |
|
app.run(host=host, port=port) |
|
|