|
|
|
|
|
|
|
"""
|
|
Dify API 客户端
|
|
从本地txt文件读取内容,发送到Dify API
|
|
支持阻塞模式和流式模式
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import requests
|
|
import httpx
|
|
import sseclient
|
|
import logging
|
|
import configparser
|
|
from typing import Dict, Any, Optional, Union
|
|
from datetime import datetime
|
|
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
|
|
|
|
|
|
|
|
def setup_logging(debug: bool = False, disable_file_logging: bool = False) -> None:
|
|
"""
|
|
设置日志记录
|
|
|
|
Args:
|
|
debug: 是否启用调试模式
|
|
disable_file_logging: 是否禁用文件日志(在某些环境如Hugging Face Space中可能需要)
|
|
"""
|
|
log_level = logging.DEBUG if debug else logging.INFO
|
|
log_format = '%(asctime)s - %(levelname)s - %(message)s'
|
|
|
|
handlers = []
|
|
|
|
|
|
if not disable_file_logging:
|
|
|
|
os.makedirs('logs', exist_ok=True)
|
|
|
|
|
|
log_file = f'logs/dify_api_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
|
|
handlers.append(logging.FileHandler(log_file, encoding='utf-8'))
|
|
|
|
|
|
handlers.append(logging.StreamHandler())
|
|
|
|
|
|
logging.basicConfig(
|
|
level=log_level,
|
|
format=log_format,
|
|
handlers=handlers
|
|
)
|
|
|
|
|
|
def load_config(config_path='config.ini'):
|
|
"""
|
|
从配置文件加载设置
|
|
|
|
Args:
|
|
config_path: 配置文件路径
|
|
|
|
Returns:
|
|
配置对象
|
|
"""
|
|
config = configparser.ConfigParser()
|
|
|
|
|
|
if os.path.exists(config_path):
|
|
config.read(config_path, encoding='utf-8')
|
|
logging.debug(f"已加载配置文件: {config_path}")
|
|
else:
|
|
logging.warning(f"配置文件不存在: {config_path}, 将使用默认值或命令行参数")
|
|
|
|
return config
|
|
|
|
|
|
class DifyAPIClient:
|
|
"""Dify API 客户端类"""
|
|
|
|
DEFAULT_BASE_URL = "http://114.132.220.127:8088/v1"
|
|
|
|
def __init__(self, api_key: str, base_url: str = None):
|
|
"""
|
|
初始化Dify API客户端
|
|
|
|
Args:
|
|
api_key: API密钥
|
|
base_url: API基础URL,如果未提供则使用默认值
|
|
"""
|
|
self.api_key = api_key
|
|
self.base_url = base_url or self.DEFAULT_BASE_URL
|
|
self.headers = {
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
logging.debug(f"初始化Dify客户端,BASE_URL: {self.base_url}")
|
|
|
|
def read_content_from_file(self, file_path: str) -> str:
|
|
"""
|
|
从文件中读取内容
|
|
|
|
Args:
|
|
file_path: 文件路径
|
|
|
|
Returns:
|
|
文件内容
|
|
"""
|
|
try:
|
|
logging.info(f"正在读取文件: {file_path}")
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
content = file.read().strip()
|
|
logging.debug(f"成功读取文件,内容长度: {len(content)} 字符")
|
|
return content
|
|
except FileNotFoundError:
|
|
logging.error(f"文件不存在: {file_path}")
|
|
print(f"错误: 文件 '{file_path}' 不存在")
|
|
sys.exit(1)
|
|
except PermissionError:
|
|
logging.error(f"没有权限访问文件: {file_path}")
|
|
print(f"错误: 没有权限读取文件 '{file_path}'")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logging.error(f"读取文件时出错: {str(e)}", exc_info=True)
|
|
print(f"读取文件时出错: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
@retry(
|
|
stop=stop_after_attempt(3),
|
|
wait=wait_exponential(multiplier=1, min=2, max=10),
|
|
retry=retry_if_exception_type((requests.exceptions.Timeout, requests.exceptions.ConnectionError))
|
|
)
|
|
def send_request_blocking(self, content: str, conversation_id: str = None, user: str = "user") -> Dict[str, Any]:
|
|
"""
|
|
发送阻塞模式请求,带有自动重试机制
|
|
|
|
Args:
|
|
content: 请求内容
|
|
conversation_id: 会话ID
|
|
user: 用户标识
|
|
|
|
Returns:
|
|
API响应
|
|
"""
|
|
url = f"{self.base_url}/chat-messages"
|
|
|
|
payload = {
|
|
"query": content,
|
|
"inputs": {},
|
|
"response_mode": "blocking",
|
|
"user": user
|
|
}
|
|
|
|
if conversation_id:
|
|
payload["conversation_id"] = conversation_id
|
|
|
|
logging.info(f"发送阻塞模式请求,用户: {user}")
|
|
if conversation_id:
|
|
logging.info(f"会话ID: {conversation_id}")
|
|
|
|
logging.debug(f"请求URL: {url}")
|
|
logging.debug(f"请求负载: {json.dumps(payload, ensure_ascii=False)}")
|
|
|
|
try:
|
|
|
|
response = requests.post(url, headers=self.headers, json=payload, timeout=(60, 120))
|
|
response.raise_for_status()
|
|
response_data = response.json()
|
|
logging.info("API请求成功")
|
|
logging.debug(f"API响应: {json.dumps(response_data, ensure_ascii=False)}")
|
|
return response_data
|
|
except requests.exceptions.HTTPError as e:
|
|
logging.error(f"HTTP错误: {e}")
|
|
if hasattr(response, 'text'):
|
|
logging.error(f"API响应内容: {response.text}")
|
|
raise Exception(f"HTTP错误: {e}")
|
|
except requests.exceptions.ConnectionError as e:
|
|
logging.error(f"连接错误: 无法连接到API服务器: {e}")
|
|
raise
|
|
except requests.exceptions.Timeout as e:
|
|
logging.error(f"超时错误: API请求超时: {e}")
|
|
raise
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"请求错误: {str(e)}", exc_info=True)
|
|
raise Exception(f"请求错误: {e}")
|
|
except json.JSONDecodeError:
|
|
logging.error("解析错误: API返回的响应不是有效的JSON格式")
|
|
if hasattr(response, 'text'):
|
|
logging.error(f"API响应内容: {response.text}")
|
|
raise Exception("解析错误: API返回的响应不是有效的JSON格式")
|
|
except Exception as e:
|
|
logging.error(f"未知错误: {str(e)}", exc_info=True)
|
|
raise Exception(f"发送请求时发生未知错误: {str(e)}")
|
|
|
|
@retry(
|
|
stop=stop_after_attempt(3),
|
|
wait=wait_exponential(multiplier=1, min=2, max=10),
|
|
retry=retry_if_exception_type((httpx.TimeoutException, httpx.ConnectError))
|
|
)
|
|
async def send_request_blocking_async(self, content: str, conversation_id: str = None, user: str = "user") -> Dict[str, Any]:
|
|
"""
|
|
异步发送阻塞模式请求,带有自动重试机制
|
|
|
|
Args:
|
|
content: 请求内容
|
|
conversation_id: 会话ID
|
|
user: 用户标识
|
|
|
|
Returns:
|
|
API响应
|
|
"""
|
|
url = f"{self.base_url}/chat-messages"
|
|
|
|
payload = {
|
|
"query": content,
|
|
"inputs": {},
|
|
"response_mode": "blocking",
|
|
"user": user
|
|
}
|
|
|
|
if conversation_id:
|
|
payload["conversation_id"] = conversation_id
|
|
|
|
logging.info(f"异步发送阻塞模式请求,用户: {user}")
|
|
if conversation_id:
|
|
logging.info(f"会话ID: {conversation_id}")
|
|
|
|
logging.debug(f"请求URL: {url}")
|
|
logging.debug(f"请求负载: {json.dumps(payload, ensure_ascii=False)}")
|
|
|
|
try:
|
|
|
|
timeout = httpx.Timeout(60.0, read=120.0)
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
response = await client.post(url, headers=self.headers, json=payload)
|
|
response.raise_for_status()
|
|
response_data = response.json()
|
|
logging.info("API请求成功")
|
|
logging.debug(f"API响应: {json.dumps(response_data, ensure_ascii=False)}")
|
|
return response_data
|
|
except httpx.HTTPStatusError as e:
|
|
logging.error(f"HTTP错误: {e}")
|
|
if hasattr(e.response, 'text'):
|
|
logging.error(f"API响应内容: {e.response.text}")
|
|
raise Exception(f"HTTP错误: {e}")
|
|
except httpx.ConnectError as e:
|
|
logging.error(f"连接错误: 无法连接到API服务器: {e}")
|
|
raise
|
|
except httpx.TimeoutException as e:
|
|
logging.error(f"超时错误: API请求超时: {e}")
|
|
raise
|
|
except httpx.RequestError as e:
|
|
logging.error(f"请求错误: {str(e)}", exc_info=True)
|
|
raise Exception(f"请求错误: {e}")
|
|
except json.JSONDecodeError:
|
|
logging.error("解析错误: API返回的响应不是有效的JSON格式")
|
|
raise Exception("解析错误: API返回的响应不是有效的JSON格式")
|
|
except Exception as e:
|
|
logging.error(f"未知错误: {str(e)}", exc_info=True)
|
|
raise Exception(f"发送请求时发生未知错误: {str(e)}")
|
|
|
|
def send_request_streaming(self, content: str, conversation_id: str = None, user: str = "user") -> str:
|
|
"""
|
|
发送流式模式请求
|
|
|
|
Args:
|
|
content: 请求内容
|
|
conversation_id: 会话ID
|
|
user: 用户标识
|
|
|
|
Returns:
|
|
完整的响应文本
|
|
"""
|
|
url = f"{self.base_url}/chat-messages"
|
|
|
|
payload = {
|
|
"query": content,
|
|
"inputs": {},
|
|
"response_mode": "streaming",
|
|
"user": user
|
|
}
|
|
|
|
if conversation_id:
|
|
payload["conversation_id"] = conversation_id
|
|
|
|
logging.info(f"发送流式模式请求,用户: {user}")
|
|
if conversation_id:
|
|
logging.info(f"会话ID: {conversation_id}")
|
|
|
|
logging.debug(f"请求URL: {url}")
|
|
logging.debug(f"请求负载: {json.dumps(payload, ensure_ascii=False)}")
|
|
|
|
try:
|
|
|
|
response = requests.post(url, headers=self.headers, json=payload, stream=True, timeout=(60, 300))
|
|
response.raise_for_status()
|
|
|
|
|
|
client = sseclient.SSEClient(response)
|
|
|
|
|
|
full_answer = ""
|
|
conversation_id = None
|
|
|
|
|
|
for event in client.events():
|
|
try:
|
|
|
|
data = json.loads(event.data)
|
|
event_type = data.get("event")
|
|
|
|
|
|
if not conversation_id and "conversation_id" in data:
|
|
conversation_id = data["conversation_id"]
|
|
logging.debug(f"获取到会话ID: {conversation_id}")
|
|
|
|
if event_type == "message":
|
|
|
|
answer_chunk = data.get("answer", "")
|
|
full_answer += answer_chunk
|
|
print(answer_chunk, end="", flush=True)
|
|
logging.debug(f"接收到消息事件: {answer_chunk}")
|
|
|
|
elif event_type == "message_end":
|
|
|
|
print("\n\n--- 消息结束 ---")
|
|
logging.info("消息流结束")
|
|
|
|
|
|
if "metadata" in data:
|
|
usage = data["metadata"].get("usage", {})
|
|
if usage:
|
|
print(f"\n使用情况统计:")
|
|
print(f" 总Token数: {usage.get('total_tokens', 'N/A')}")
|
|
print(f" 提示Token数: {usage.get('prompt_tokens', 'N/A')}")
|
|
print(f" 补全Token数: {usage.get('completion_tokens', 'N/A')}")
|
|
print(f" 总费用: {usage.get('total_price', 'N/A')} {usage.get('currency', 'USD')}")
|
|
|
|
logging.info(f"使用统计 - 总Token数: {usage.get('total_tokens', 'N/A')}, "
|
|
f"总费用: {usage.get('total_price', 'N/A')} {usage.get('currency', 'USD')}")
|
|
|
|
elif event_type == "workflow_started":
|
|
logging.info("工作流开始")
|
|
|
|
elif event_type == "node_started":
|
|
node_id = data.get("data", {}).get("node_id", "unknown")
|
|
logging.debug(f"节点开始执行: {node_id}")
|
|
|
|
elif event_type == "node_finished":
|
|
node_id = data.get("data", {}).get("node_id", "unknown")
|
|
status = data.get("data", {}).get("status", "unknown")
|
|
logging.debug(f"节点执行完成: {node_id}, 状态: {status}")
|
|
|
|
elif event_type == "workflow_finished":
|
|
status = data.get("data", {}).get("status", "unknown")
|
|
logging.info(f"工作流完成,状态: {status}")
|
|
|
|
elif event_type == "error":
|
|
|
|
error_message = data.get("message", "未知错误")
|
|
logging.error(f"API错误: {error_message}")
|
|
print(f"\n错误: {error_message}")
|
|
break
|
|
|
|
except json.JSONDecodeError:
|
|
logging.error(f"解析错误: 无法解析事件数据: {event.data}")
|
|
print(f"\n解析错误: 无法解析事件数据: {event.data}")
|
|
except Exception as e:
|
|
logging.error(f"处理事件时出错: {str(e)}", exc_info=True)
|
|
print(f"\n处理事件时出错: {str(e)}")
|
|
|
|
|
|
if conversation_id:
|
|
print(f"\n会话ID: {conversation_id}")
|
|
|
|
return full_answer
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
logging.error(f"HTTP错误: {e}")
|
|
if hasattr(response, 'text'):
|
|
logging.error(f"API响应内容: {response.text}")
|
|
print(f"HTTP错误: {e}")
|
|
if hasattr(response, 'text') and response.text:
|
|
print(f"API响应: {response.text}")
|
|
sys.exit(1)
|
|
except requests.exceptions.ConnectionError:
|
|
logging.error("连接错误: 无法连接到API服务器")
|
|
print("连接错误: 无法连接到API服务器")
|
|
sys.exit(1)
|
|
except requests.exceptions.Timeout:
|
|
logging.error("超时错误: API请求超时")
|
|
print("超时错误: API请求超时")
|
|
sys.exit(1)
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"请求错误: {str(e)}", exc_info=True)
|
|
print(f"请求错误: {e}")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logging.error(f"处理流式响应时出错: {str(e)}", exc_info=True)
|
|
print(f"处理流式响应时出错: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
|
|
def main():
|
|
"""主函数"""
|
|
|
|
config = load_config()
|
|
|
|
|
|
default_api_key = config.get('API', 'api_key', fallback=None)
|
|
default_mode = config.get('SETTINGS', 'default_mode', fallback='streaming')
|
|
default_user = config.get('SETTINGS', 'default_user', fallback='user')
|
|
default_base_url = config.get('API', 'base_url', fallback=None)
|
|
|
|
|
|
default_file = 'example_query.txt'
|
|
|
|
parser = argparse.ArgumentParser(description='Dify API 客户端 - 从本地txt文件读取内容发送到API')
|
|
parser.add_argument('file_path', nargs='?', default=default_file,
|
|
help=f'本地txt文件路径 (默认: {default_file})')
|
|
parser.add_argument('--api-key', help='Dify API密钥(如果未指定,将从config.ini读取)')
|
|
parser.add_argument('--base-url', help='Dify API基础URL(如果未指定,将从config.ini读取)')
|
|
parser.add_argument('--mode', choices=['blocking', 'streaming'], default=default_mode,
|
|
help=f'请求模式: blocking(阻塞) 或 streaming(流式), 默认为 {default_mode}')
|
|
parser.add_argument('--conversation-id', help='会话ID,用于继续已有对话')
|
|
parser.add_argument('--user', default=default_user, help=f'用户标识, 默认为 {default_user}')
|
|
parser.add_argument('--debug', action='store_true', help='启用调试日志')
|
|
parser.add_argument('--no-file-log', action='store_true', help='禁用文件日志')
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
setup_logging(args.debug, args.no_file_log)
|
|
|
|
logging.info("Dify API客户端启动")
|
|
logging.info(f"模式: {args.mode}")
|
|
|
|
|
|
api_key = args.api_key or default_api_key
|
|
|
|
if api_key is None or api_key == 'your_api_key_here':
|
|
logging.error("未提供API密钥,请在命令行参数中使用--api-key指定,或在config.ini中配置")
|
|
print("错误: 未提供API密钥,请使用--api-key参数提供,或在config.ini中配置")
|
|
sys.exit(1)
|
|
|
|
|
|
base_url = args.base_url or default_base_url
|
|
|
|
try:
|
|
|
|
client = DifyAPIClient(api_key, base_url)
|
|
|
|
|
|
content = client.read_content_from_file(args.file_path)
|
|
|
|
if not content:
|
|
logging.error("文件内容为空")
|
|
print("错误: 文件内容为空")
|
|
sys.exit(1)
|
|
|
|
print(f"正在使用 {args.mode} 模式发送请求...")
|
|
|
|
|
|
if args.mode == 'blocking':
|
|
response = client.send_request_blocking(content, args.conversation_id, args.user)
|
|
print("\n响应:")
|
|
print(f"{response['answer']}")
|
|
|
|
if 'metadata' in response and 'usage' in response['metadata']:
|
|
usage = response['metadata']['usage']
|
|
print(f"\n使用情况统计:")
|
|
print(f" 总Token数: {usage.get('total_tokens', 'N/A')}")
|
|
print(f" 提示Token数: {usage.get('prompt_tokens', 'N/A')}")
|
|
print(f" 补全Token数: {usage.get('completion_tokens', 'N/A')}")
|
|
print(f" 总费用: {usage.get('total_price', 'N/A')} {usage.get('currency', 'USD')}")
|
|
|
|
print(f"\n会话ID: {response.get('conversation_id', 'N/A')}")
|
|
else:
|
|
client.send_request_streaming(content, args.conversation_id, args.user)
|
|
|
|
logging.info("请求执行完毕")
|
|
|
|
except KeyboardInterrupt:
|
|
logging.info("用户中断操作")
|
|
print("\n操作被用户中断")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
logging.error(f"未捕获的异常: {str(e)}", exc_info=True)
|
|
print(f"发生未知错误: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |