#!/usr/bin/env python # -*- coding: utf-8 -*- """ Dify API 客户端 从本地txt文件读取内容,发送到Dify API 支持阻塞模式和流式模式 """ import os import sys import json import argparse import requests import sseclient import logging import configparser from typing import Dict, Any, Optional, Union from datetime import datetime # 配置日志 def setup_logging(debug: bool = False, disable_file_logging: bool = False) -> None: """ 设置日志记录 Args: debug: 是否启用调试模式 disable_file_logging: 是否禁用文件日志(在某些环境如Hugging Face Space中可能需要) """ log_level = logging.DEBUG if debug else logging.INFO log_format = '%(asctime)s - %(levelname)s - %(message)s' handlers = [] # 如果不禁用文件日志,则添加文件处理器 if not disable_file_logging: # 创建logs目录(如果不存在) os.makedirs('logs', exist_ok=True) # 设置文件日志 log_file = f'logs/dify_api_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' handlers.append(logging.FileHandler(log_file, encoding='utf-8')) # 始终添加控制台处理器 handlers.append(logging.StreamHandler()) # 配置日志记录器 logging.basicConfig( level=log_level, format=log_format, handlers=handlers ) # 加载配置文件 def load_config(config_path='config.ini'): """ 从配置文件加载设置 Args: config_path: 配置文件路径 Returns: 配置对象 """ config = configparser.ConfigParser() # 检查配置文件是否存在 if os.path.exists(config_path): config.read(config_path, encoding='utf-8') logging.debug(f"已加载配置文件: {config_path}") else: logging.warning(f"配置文件不存在: {config_path}, 将使用默认值或命令行参数") return config class DifyAPIClient: """Dify API 客户端类""" DEFAULT_BASE_URL = "http://114.132.220.127:8088/v1" def __init__(self, api_key: str, base_url: str = None): """ 初始化Dify API客户端 Args: api_key: API密钥 base_url: API基础URL,如果未提供则使用默认值 """ self.api_key = api_key self.base_url = base_url or self.DEFAULT_BASE_URL self.headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } logging.debug(f"初始化Dify客户端,BASE_URL: {self.base_url}") def read_content_from_file(self, file_path: str) -> str: """ 从文件中读取内容 Args: file_path: 文件路径 Returns: 文件内容 """ try: logging.info(f"正在读取文件: {file_path}") with open(file_path, 'r', encoding='utf-8') as file: content = file.read().strip() logging.debug(f"成功读取文件,内容长度: {len(content)} 字符") return content except FileNotFoundError: logging.error(f"文件不存在: {file_path}") print(f"错误: 文件 '{file_path}' 不存在") sys.exit(1) except PermissionError: logging.error(f"没有权限访问文件: {file_path}") print(f"错误: 没有权限读取文件 '{file_path}'") sys.exit(1) except Exception as e: logging.error(f"读取文件时出错: {str(e)}", exc_info=True) print(f"读取文件时出错: {str(e)}") sys.exit(1) def send_request_blocking(self, content: str, conversation_id: str = None, user: str = "user") -> Dict[str, Any]: """ 发送阻塞模式请求 Args: content: 请求内容 conversation_id: 会话ID user: 用户标识 Returns: API响应 """ url = f"{self.base_url}/chat-messages" payload = { "query": content, "inputs": {}, "response_mode": "blocking", "user": user } if conversation_id: payload["conversation_id"] = conversation_id logging.info(f"发送阻塞模式请求,用户: {user}") if conversation_id: logging.info(f"会话ID: {conversation_id}") logging.debug(f"请求URL: {url}") logging.debug(f"请求负载: {json.dumps(payload, ensure_ascii=False)}") try: response = requests.post(url, headers=self.headers, json=payload) response.raise_for_status() response_data = response.json() logging.info("API请求成功") logging.debug(f"API响应: {json.dumps(response_data, ensure_ascii=False)}") return response_data except requests.exceptions.HTTPError as e: logging.error(f"HTTP错误: {e}") if hasattr(response, 'text'): logging.error(f"API响应内容: {response.text}") print(f"HTTP错误: {e}") if hasattr(response, 'text') and response.text: print(f"API响应: {response.text}") sys.exit(1) except requests.exceptions.ConnectionError: logging.error("连接错误: 无法连接到API服务器") print("连接错误: 无法连接到API服务器") sys.exit(1) except requests.exceptions.Timeout: logging.error("超时错误: API请求超时") print("超时错误: API请求超时") sys.exit(1) except requests.exceptions.RequestException as e: logging.error(f"请求错误: {str(e)}", exc_info=True) print(f"请求错误: {e}") sys.exit(1) except json.JSONDecodeError: logging.error("解析错误: API返回的响应不是有效的JSON格式") if hasattr(response, 'text'): logging.error(f"API响应内容: {response.text}") print("解析错误: API返回的响应不是有效的JSON格式") if hasattr(response, 'text'): print(f"API响应: {response.text}") sys.exit(1) except Exception as e: logging.error(f"未知错误: {str(e)}", exc_info=True) print(f"发送请求时发生未知错误: {str(e)}") sys.exit(1) def send_request_streaming(self, content: str, conversation_id: str = None, user: str = "user") -> str: """ 发送流式模式请求 Args: content: 请求内容 conversation_id: 会话ID user: 用户标识 Returns: 完整的响应文本 """ url = f"{self.base_url}/chat-messages" payload = { "query": content, "inputs": {}, "response_mode": "streaming", "user": user } if conversation_id: payload["conversation_id"] = conversation_id logging.info(f"发送流式模式请求,用户: {user}") if conversation_id: logging.info(f"会话ID: {conversation_id}") logging.debug(f"请求URL: {url}") logging.debug(f"请求负载: {json.dumps(payload, ensure_ascii=False)}") try: # 使用流式请求 response = requests.post(url, headers=self.headers, json=payload, stream=True) response.raise_for_status() # 创建SSE客户端 client = sseclient.SSEClient(response) # 初始化变量,用于累积回答 full_answer = "" conversation_id = None # 处理每个事件 for event in client.events(): try: # 解析事件数据 data = json.loads(event.data) event_type = data.get("event") # 记录会话ID(如果存在且我们尚未捕获它) if not conversation_id and "conversation_id" in data: conversation_id = data["conversation_id"] logging.debug(f"获取到会话ID: {conversation_id}") if event_type == "message": # 提取回答文本并打印 answer_chunk = data.get("answer", "") full_answer += answer_chunk print(answer_chunk, end="", flush=True) logging.debug(f"接收到消息事件: {answer_chunk}") elif event_type == "message_end": # 消息结束事件 print("\n\n--- 消息结束 ---") logging.info("消息流结束") # 输出元数据 if "metadata" in data: usage = data["metadata"].get("usage", {}) if usage: print(f"\n使用情况统计:") print(f" 总Token数: {usage.get('total_tokens', 'N/A')}") print(f" 提示Token数: {usage.get('prompt_tokens', 'N/A')}") print(f" 补全Token数: {usage.get('completion_tokens', 'N/A')}") print(f" 总费用: {usage.get('total_price', 'N/A')} {usage.get('currency', 'USD')}") logging.info(f"使用统计 - 总Token数: {usage.get('total_tokens', 'N/A')}, " f"总费用: {usage.get('total_price', 'N/A')} {usage.get('currency', 'USD')}") elif event_type == "workflow_started": logging.info("工作流开始") elif event_type == "node_started": node_id = data.get("data", {}).get("node_id", "unknown") logging.debug(f"节点开始执行: {node_id}") elif event_type == "node_finished": node_id = data.get("data", {}).get("node_id", "unknown") status = data.get("data", {}).get("status", "unknown") logging.debug(f"节点执行完成: {node_id}, 状态: {status}") elif event_type == "workflow_finished": status = data.get("data", {}).get("status", "unknown") logging.info(f"工作流完成,状态: {status}") elif event_type == "error": # 错误事件 error_message = data.get("message", "未知错误") logging.error(f"API错误: {error_message}") print(f"\n错误: {error_message}") break except json.JSONDecodeError: logging.error(f"解析错误: 无法解析事件数据: {event.data}") print(f"\n解析错误: 无法解析事件数据: {event.data}") except Exception as e: logging.error(f"处理事件时出错: {str(e)}", exc_info=True) print(f"\n处理事件时出错: {str(e)}") # 如果获取到了会话ID,显示给用户 if conversation_id: print(f"\n会话ID: {conversation_id}") return full_answer except requests.exceptions.HTTPError as e: logging.error(f"HTTP错误: {e}") if hasattr(response, 'text'): logging.error(f"API响应内容: {response.text}") print(f"HTTP错误: {e}") if hasattr(response, 'text') and response.text: print(f"API响应: {response.text}") sys.exit(1) except requests.exceptions.ConnectionError: logging.error("连接错误: 无法连接到API服务器") print("连接错误: 无法连接到API服务器") sys.exit(1) except requests.exceptions.Timeout: logging.error("超时错误: API请求超时") print("超时错误: API请求超时") sys.exit(1) except requests.exceptions.RequestException as e: logging.error(f"请求错误: {str(e)}", exc_info=True) print(f"请求错误: {e}") sys.exit(1) except Exception as e: logging.error(f"处理流式响应时出错: {str(e)}", exc_info=True) print(f"处理流式响应时出错: {str(e)}") sys.exit(1) def main(): """主函数""" # 加载配置文件 config = load_config() # 从配置文件获取默认值 default_api_key = config.get('API', 'api_key', fallback=None) default_mode = config.get('SETTINGS', 'default_mode', fallback='streaming') default_user = config.get('SETTINGS', 'default_user', fallback='user') default_base_url = config.get('API', 'base_url', fallback=None) # 设置默认文件 default_file = 'example_query.txt' parser = argparse.ArgumentParser(description='Dify API 客户端 - 从本地txt文件读取内容发送到API') parser.add_argument('file_path', nargs='?', default=default_file, help=f'本地txt文件路径 (默认: {default_file})') parser.add_argument('--api-key', help='Dify API密钥(如果未指定,将从config.ini读取)') parser.add_argument('--base-url', help='Dify API基础URL(如果未指定,将从config.ini读取)') parser.add_argument('--mode', choices=['blocking', 'streaming'], default=default_mode, help=f'请求模式: blocking(阻塞) 或 streaming(流式), 默认为 {default_mode}') parser.add_argument('--conversation-id', help='会话ID,用于继续已有对话') parser.add_argument('--user', default=default_user, help=f'用户标识, 默认为 {default_user}') parser.add_argument('--debug', action='store_true', help='启用调试日志') parser.add_argument('--no-file-log', action='store_true', help='禁用文件日志') args = parser.parse_args() # 设置日志 setup_logging(args.debug, args.no_file_log) logging.info("Dify API客户端启动") logging.info(f"模式: {args.mode}") # 确定API密钥 api_key = args.api_key or default_api_key if api_key is None or api_key == 'your_api_key_here': logging.error("未提供API密钥,请在命令行参数中使用--api-key指定,或在config.ini中配置") print("错误: 未提供API密钥,请使用--api-key参数提供,或在config.ini中配置") sys.exit(1) # 确定API基础URL base_url = args.base_url or default_base_url try: # 创建客户端实例 client = DifyAPIClient(api_key, base_url) # 从文件读取内容 content = client.read_content_from_file(args.file_path) if not content: logging.error("文件内容为空") print("错误: 文件内容为空") sys.exit(1) print(f"正在使用 {args.mode} 模式发送请求...") # 根据模式发送请求 if args.mode == 'blocking': response = client.send_request_blocking(content, args.conversation_id, args.user) print("\n响应:") print(f"{response['answer']}") if 'metadata' in response and 'usage' in response['metadata']: usage = response['metadata']['usage'] print(f"\n使用情况统计:") print(f" 总Token数: {usage.get('total_tokens', 'N/A')}") print(f" 提示Token数: {usage.get('prompt_tokens', 'N/A')}") print(f" 补全Token数: {usage.get('completion_tokens', 'N/A')}") print(f" 总费用: {usage.get('total_price', 'N/A')} {usage.get('currency', 'USD')}") print(f"\n会话ID: {response.get('conversation_id', 'N/A')}") else: client.send_request_streaming(content, args.conversation_id, args.user) logging.info("请求执行完毕") except KeyboardInterrupt: logging.info("用户中断操作") print("\n操作被用户中断") sys.exit(0) except Exception as e: logging.error(f"未捕获的异常: {str(e)}", exc_info=True) print(f"发生未知错误: {str(e)}") sys.exit(1) if __name__ == "__main__": main()