xiaozhi / src /display /cli_display.py
nzjsdsk's picture
Upload 169 files
27e74f3 verified
import asyncio
import threading
import time
import os
from typing import Optional, Callable
from src.display.base_display import BaseDisplay
# 替换keyboard导入为pynput
from pynput import keyboard as pynput_keyboard
from src.utils.logging_config import get_logger
class CliDisplay(BaseDisplay):
def __init__(self):
super().__init__() # 调用父类初始化
"""初始化CLI显示"""
self.logger = get_logger(__name__)
self.running = True
# 状态相关
self.current_status = "未连接"
self.current_text = "待命"
self.current_emotion = "😊"
self.current_volume = 0 # 添加当前音量属性
# 回调函数
self.auto_callback = None
self.status_callback = None
self.text_callback = None
self.emotion_callback = None
self.abort_callback = None
self.send_text_callback = None
# 按键状态
self.is_r_pressed = False
# 状态缓存
self.last_status = None
self.last_text = None
self.last_emotion = None
self.last_volume = None
# 键盘监听器
self.keyboard_listener = None
# 为异步操作添加事件循环
self.loop = asyncio.new_event_loop()
def set_callbacks(self,
press_callback: Optional[Callable] = None,
release_callback: Optional[Callable] = None,
status_callback: Optional[Callable] = None,
text_callback: Optional[Callable] = None,
emotion_callback: Optional[Callable] = None,
mode_callback: Optional[Callable] = None,
auto_callback: Optional[Callable] = None,
abort_callback: Optional[Callable] = None,
send_text_callback: Optional[Callable] = None):
"""设置回调函数"""
self.status_callback = status_callback
self.text_callback = text_callback
self.emotion_callback = emotion_callback
self.auto_callback = auto_callback
self.abort_callback = abort_callback
self.send_text_callback = send_text_callback
def update_button_status(self, text: str):
"""更新按钮状态"""
print(f"按钮状态: {text}")
def update_status(self, status: str):
"""更新状态文本"""
if status != self.current_status:
self.current_status = status
self._print_current_status()
def update_text(self, text: str):
"""更新TTS文本"""
if text != self.current_text:
self.current_text = text
self._print_current_status()
def update_emotion(self, emotion_path: str):
"""更新表情
emotion_path: GIF文件路径或表情字符串
"""
if emotion_path != self.current_emotion:
# 如果是gif文件路径,提取文件名作为表情名
if emotion_path.endswith(".gif"):
# 从路径中提取文件名,去掉.gif后缀
emotion_name = os.path.basename(emotion_path).replace(".gif", "")
self.current_emotion = f"[{emotion_name}]"
else:
# 如果不是gif路径,则直接使用
self.current_emotion = emotion_path
self._print_current_status()
def start_keyboard_listener(self):
"""启动键盘监听"""
try:
def on_press(key):
try:
# F2 按键处理 - 自动对话
if key == pynput_keyboard.Key.f2:
if self.auto_callback:
self.auto_callback()
# F3 按键处理 - 打断
elif key == pynput_keyboard.Key.f3:
if self.abort_callback:
self.abort_callback()
except Exception as e:
self.logger.error(f"键盘事件处理错误: {e}")
# 创建并启动监听器
self.keyboard_listener = pynput_keyboard.Listener(
on_press=on_press
)
self.keyboard_listener.start()
self.logger.info("键盘监听器初始化成功")
except Exception as e:
self.logger.error(f"键盘监听器初始化失败: {e}")
def stop_keyboard_listener(self):
"""停止键盘监听"""
if self.keyboard_listener:
try:
self.keyboard_listener.stop()
self.keyboard_listener = None
self.logger.info("键盘监听器已停止")
except Exception as e:
self.logger.error(f"停止键盘监听器失败: {e}")
def start(self):
"""启动CLI显示"""
self._print_help()
# 启动状态更新线程
self.start_update_threads()
# 启动键盘监听线程
keyboard_thread = threading.Thread(target=self._keyboard_listener)
keyboard_thread.daemon = True
keyboard_thread.start()
# 启动键盘监听
self.start_keyboard_listener()
# 主循环
try:
while self.running:
time.sleep(0.1)
except KeyboardInterrupt:
self.on_close()
def on_close(self):
"""关闭CLI显示"""
self.running = False
print("\n正在关闭应用...")
self.stop_keyboard_listener()
def _print_help(self):
"""打印帮助信息"""
print("\n=== 小智Ai命令行控制 ===")
print("可用命令:")
print(" r - 开始/停止对话")
print(" x - 打断当前对话")
print(" s - 显示当前状态")
print(" v 数字 - 设置音量(0-100)")
print(" q - 退出程序")
print(" h - 显示此帮助信息")
print("=====================\n")
def _keyboard_listener(self):
"""键盘监听线程"""
try:
while self.running:
cmd = input().lower().strip()
if cmd == 'q':
self.on_close()
break
elif cmd == 'h':
self._print_help()
elif cmd == 'r':
if self.auto_callback:
self.auto_callback()
elif cmd == 'x':
if self.abort_callback:
self.abort_callback()
elif cmd == 's':
self._print_current_status()
elif cmd.startswith('v '): # 添加音量命令处理
try:
volume = int(cmd.split()[1]) # 获取音量值
if 0 <= volume <= 100:
self.update_volume(volume)
print(f"音量已设置为: {volume}%")
else:
print("音量必须在0-100之间")
except (IndexError, ValueError):
print("无效的音量值,格式:v <0-100>")
else:
if self.send_text_callback:
# 获取应用程序的事件循环并在其中运行协程
from src.application import Application
app = Application.get_instance()
if app and app.loop:
asyncio.run_coroutine_threadsafe(
self.send_text_callback(cmd),
app.loop
)
else:
print("应用程序实例或事件循环不可用")
except Exception as e:
self.logger.error(f"键盘监听错误: {e}")
def start_update_threads(self):
"""启动更新线程"""
def update_loop():
while self.running:
try:
# 更新状态
if self.status_callback:
status = self.status_callback()
if status and status != self.current_status:
self.update_status(status)
# 更新文本
if self.text_callback:
text = self.text_callback()
if text and text != self.current_text:
self.update_text(text)
# 更新表情
if self.emotion_callback:
emotion = self.emotion_callback()
if emotion and emotion != self.current_emotion:
self.update_emotion(emotion)
except Exception as e:
self.logger.error(f"状态更新错误: {e}")
time.sleep(0.1)
# 启动更新线程
threading.Thread(target=update_loop, daemon=True).start()
def _print_current_status(self):
"""打印当前状态"""
# 检查是否有状态变化
status_changed = (
self.current_status != self.last_status or
self.current_text != self.last_text or
self.current_emotion != self.last_emotion or
self.current_volume != self.last_volume
)
if status_changed:
print("\n=== 当前状态 ===")
print(f"状态: {self.current_status}")
print(f"文本: {self.current_text}")
print(f"表情: {self.current_emotion}")
print(f"音量: {self.current_volume}%")
print("===============\n")
# 更新缓存
self.last_status = self.current_status
self.last_text = self.current_text
self.last_emotion = self.current_emotion
self.last_volume = self.current_volume