xiaozhi / src /display /gui_display.py
nzjsdsk's picture
Upload 169 files
27e74f3 verified
import sys
import os
import logging
import threading
from pathlib import Path
from urllib.parse import urlparse
from PyQt5.QtCore import (
Qt, QTimer, QPropertyAnimation, QRect,
QEvent, QObject
)
from PyQt5.QtWidgets import (
QApplication, QWidget, QVBoxLayout,
QHBoxLayout, QLabel, QPushButton, QSlider, QLineEdit,
QComboBox, QCheckBox, QMessageBox, QFrame,
QStackedWidget, QTabBar, QStyleOptionSlider, QStyle,
QGraphicsOpacityEffect, QSizePolicy, QScrollArea, QGridLayout
)
from PyQt5.QtGui import (
QPainter, QColor, QFont, QMouseEvent, QMovie, QBrush, QPen,
QLinearGradient, QTransform, QPainterPath
)
from src.utils.config_manager import ConfigManager
import queue
import time
import numpy as np
from typing import Optional, Callable
from pynput import keyboard as pynput_keyboard
from abc import ABCMeta
from src.display.base_display import BaseDisplay
import json
# 定义配置文件路径
CONFIG_PATH = Path(__file__).parent.parent.parent / "config" / "config.json"
def restart_program():
"""使用 os.execv 重启当前 Python 程序。"""
try:
python = sys.executable
print(f"Attempting to restart with: {python} {sys.argv}")
# 尝试关闭 Qt 应用,虽然 execv 会接管,但这样做更规范
app = QApplication.instance()
if app:
app.quit()
# 替换当前进程
os.execv(python, [python] + sys.argv)
except Exception as e:
print(f"重启程序失败: {e}")
logging.getLogger("Display").error(f"重启程序失败: {e}", exc_info=True)
# 如果重启失败,可以选择退出或通知用户
sys.exit(1) # 或者弹出一个错误消息框
# 创建兼容的元类
class CombinedMeta(type(QObject), ABCMeta):
pass
class GuiDisplay(BaseDisplay, QObject, metaclass=CombinedMeta):
def __init__(self):
# 重要:调用 super() 处理多重继承
super().__init__()
QObject.__init__(self) # 调用 QObject 初始化
# 初始化日志
self.logger = logging.getLogger("Display")
self.app = None
self.root = None
# 一些提前初始化的变量
self.status_label = None
self.emotion_label = None
self.tts_text_label = None
self.volume_scale = None
self.manual_btn = None
self.abort_btn = None
self.auto_btn = None
self.mode_btn = None
self.mute = None
self.stackedWidget = None
self.nav_tab_bar = None
# 添加表情动画对象
self.emotion_movie = None
# 新增表情动画特效相关变量
self.emotion_effect = None # 表情透明度特效
self.emotion_animation = None # 表情动画对象
self.next_emotion_path = None # 下一个待显示的表情
self.is_emotion_animating = False # 是否正在进行表情切换动画
# 音量控制相关
self.volume_label = None # 音量百分比标签
self.volume_control_available = False # 系统音量控制是否可用
self.volume_controller_failed = False # 标记音量控制是否失败
# 麦克风可视化相关
self.mic_visualizer = None # 麦克风可视化组件
self.mic_timer = None # 麦克风音量更新定时器
self.is_listening = False # 是否正在监听
# 设置页面控件
self.wakeWordEnableSwitch = None
self.wakeWordsLineEdit = None
self.saveSettingsButton = None
# 新增网络和设备ID控件引用
self.deviceIdLineEdit = None
self.wsProtocolComboBox = None
self.wsAddressLineEdit = None
self.wsTokenLineEdit = None
# 新增OTA地址控件引用
self.otaProtocolComboBox = None
self.otaAddressLineEdit = None
# Home Assistant 控件引用
self.haProtocolComboBox = None
self.ha_server = None
self.ha_port = None
self.ha_key = None
self.Add_ha_devices = None
self.is_muted = False
self.pre_mute_volume = self.current_volume
# 对话模式标志
self.auto_mode = False
# 回调函数
self.button_press_callback = None
self.button_release_callback = None
self.status_update_callback = None
self.text_update_callback = None
self.emotion_update_callback = None
self.mode_callback = None
self.auto_callback = None
self.abort_callback = None
self.send_text_callback = None
# 更新队列
self.update_queue = queue.Queue()
# 运行标志
self._running = True
# 键盘监听器
self.keyboard_listener = None
# 滑动手势相关
self.last_mouse_pos = None
# 保存定时器引用以避免被销毁
self.update_timer = None
self.volume_update_timer = None
# 动画相关
self.current_effect = None
self.current_animation = None
self.animation = None
self.fade_widget = None
self.animated_widget = None
# 检查系统音量控制是否可用
self.volume_control_available = (hasattr(self, 'volume_controller') and
self.volume_controller is not None)
# 尝试获取一次系统音量,检测音量控制是否正常工作
self.get_current_volume()
# 新增iotPage相关变量
self.devices_list = []
self.device_labels = {}
self.history_title = None
self.iot_card = None
self.ha_update_timer = None
self.device_states = {}
def eventFilter(self, source, event):
if source == self.volume_scale and event.type() == QEvent.MouseButtonPress:
if event.button() == Qt.LeftButton:
slider = self.volume_scale
opt = QStyleOptionSlider()
slider.initStyleOption(opt)
# 获取滑块手柄和轨道的矩形区域
handle_rect = slider.style().subControlRect(
QStyle.CC_Slider, opt, QStyle.SC_SliderHandle, slider)
groove_rect = slider.style().subControlRect(
QStyle.CC_Slider, opt, QStyle.SC_SliderGroove, slider)
# 如果点击在手柄上,则让默认处理器处理拖动
if handle_rect.contains(event.pos()):
return False
# 计算点击位置相对于轨道的位置
if slider.orientation() == Qt.Horizontal:
# 确保点击在有效的轨道范围内
if (event.pos().x() < groove_rect.left() or
event.pos().x() > groove_rect.right()):
return False # 点击在轨道外部
pos = event.pos().x() - groove_rect.left()
max_pos = groove_rect.width()
else:
if (event.pos().y() < groove_rect.top() or
event.pos().y() > groove_rect.bottom()):
return False # 点击在轨道外部
pos = groove_rect.bottom() - event.pos().y()
max_pos = groove_rect.height()
if max_pos > 0: # 避免除以零
value_range = slider.maximum() - slider.minimum()
# 根据点击位置计算新的值
new_value = slider.minimum() + round(
(value_range * pos) / max_pos)
# 直接设置滑块的值
slider.setValue(int(new_value))
return True # 表示事件已处理
return super().eventFilter(source, event)
def _setup_navigation(self):
"""设置导航标签栏 (QTabBar)"""
# 使用 addTab 添加标签
self.nav_tab_bar.addTab("聊天") # index 0
self.nav_tab_bar.addTab("设备管理") # index 1
self.nav_tab_bar.addTab("参数配置") # index 2
# 将 QTabBar 的 currentChanged 信号连接到处理函数
self.nav_tab_bar.currentChanged.connect(self._on_navigation_index_changed)
# 设置默认选中项 (通过索引)
self.nav_tab_bar.setCurrentIndex(0) # 默认选中第一个标签
def _on_navigation_index_changed(self, index: int):
"""处理导航标签变化 (通过索引)"""
# 映射回 routeKey 以便复用动画和加载逻辑
index_to_routeKey = {0: "mainInterface", 1: "iotInterface", 2: "settingInterface"}
routeKey = index_to_routeKey.get(index)
if routeKey is None:
self.logger.warning(f"未知的导航索引: {index}")
return
target_index = index # 直接使用索引
if target_index == self.stackedWidget.currentIndex():
return
current_widget = self.stackedWidget.currentWidget()
self.stackedWidget.setCurrentIndex(target_index)
new_widget = self.stackedWidget.currentWidget()
# 如果切换到设置页面,加载设置
if routeKey == "settingInterface":
self._load_settings()
# 如果切换到设备管理页面,加载设备
if routeKey == "iotInterface":
self._load_iot_devices()
def set_callbacks(
self,
press_callback: Optional[Callable] = None,
release_callback: Optional[Callable] = None,
status_callback: Optional[Callable] = None,
text_callback: Optional[Callable] = None,
emotion_callback: Optional[Callable] = None,
mode_callback: Optional[Callable] = None,
auto_callback: Optional[Callable] = None,
abort_callback: Optional[Callable] = None,
send_text_callback: Optional[Callable] = None,
):
"""设置回调函数"""
self.button_press_callback = press_callback
self.button_release_callback = release_callback
self.status_update_callback = status_callback
self.text_update_callback = text_callback
self.emotion_update_callback = emotion_callback
self.mode_callback = mode_callback
self.auto_callback = auto_callback
self.abort_callback = abort_callback
self.send_text_callback = send_text_callback
def _process_updates(self):
"""处理更新队列"""
if not self._running:
return
try:
while True:
try:
# 非阻塞方式获取更新
update_func = self.update_queue.get_nowait()
update_func()
self.update_queue.task_done()
except queue.Empty:
break
except Exception as e:
self.logger.error(f"处理更新队列时发生错误: {e}")
def _on_manual_button_press(self):
"""手动模式按钮按下事件处理"""
try:
# 更新按钮文本为"松开以停止"
if self.manual_btn and self.manual_btn.isVisible():
self.manual_btn.setText("松开以停止")
# 调用回调函数
if self.button_press_callback:
self.button_press_callback()
except Exception as e:
self.logger.error(f"按钮按下回调执行失败: {e}")
def _on_manual_button_release(self):
"""手动模式按钮释放事件处理"""
try:
# 更新按钮文本为"按住后说话"
if self.manual_btn and self.manual_btn.isVisible():
self.manual_btn.setText("按住后说话")
# 调用回调函数
if self.button_release_callback:
self.button_release_callback()
except Exception as e:
self.logger.error(f"按钮释放回调执行失败: {e}")
def _on_auto_button_click(self):
"""自动模式按钮点击事件处理"""
try:
if self.auto_callback:
self.auto_callback()
except Exception as e:
self.logger.error(f"自动模式按钮回调执行失败: {e}")
def _on_abort_button_click(self):
"""处理中止按钮点击事件"""
if self.abort_callback:
self.abort_callback()
def _on_mode_button_click(self):
"""对话模式切换按钮点击事件"""
try:
# 检查是否可以切换模式(通过回调函数询问应用程序当前状态)
if self.mode_callback:
# 如果回调函数返回False,表示当前不能切换模式
if not self.mode_callback(not self.auto_mode):
return
# 切换模式
self.auto_mode = not self.auto_mode
# 更新按钮显示
if self.auto_mode:
# 切换到自动模式
self.update_mode_button_status("自动对话")
# 隐藏手动按钮,显示自动按钮
self.update_queue.put(self._switch_to_auto_mode)
else:
# 切换到手动模式
self.update_mode_button_status("手动对话")
# 隐藏自动按钮,显示手动按钮
self.update_queue.put(self._switch_to_manual_mode)
except Exception as e:
self.logger.error(f"模式切换按钮回调执行失败: {e}")
def _switch_to_auto_mode(self):
"""切换到自动模式的UI更新"""
if self.manual_btn and self.auto_btn:
self.manual_btn.hide()
self.auto_btn.show()
def _switch_to_manual_mode(self):
"""切换到手动模式的UI更新"""
if self.manual_btn and self.auto_btn:
self.auto_btn.hide()
self.manual_btn.show()
def update_status(self, status: str):
"""更新状态文本 (只更新主状态)"""
full_status_text = f"状态: {status}"
self.update_queue.put(lambda: self._safe_update_label(self.status_label, full_status_text))
# 根据状态更新麦克风可视化
if "聆听中" in status:
self.update_queue.put(self._start_mic_visualization)
elif "待命" in status or "说话中" in status:
self.update_queue.put(self._stop_mic_visualization)
def update_text(self, text: str):
"""更新TTS文本"""
self.update_queue.put(lambda: self._safe_update_label(self.tts_text_label, text))
def update_emotion(self, emotion_path: str):
"""更新表情,使用GIF动画显示"""
# 确保使用绝对路径
abs_path = os.path.abspath(emotion_path)
# 检查是否与上次设置的表情相同,避免重复设置
if hasattr(self, 'last_emotion_path') and self.last_emotion_path == abs_path:
return # 如果是相同的表情路径,直接返回不重复设置
# 更新缓存的路径
self.last_emotion_path = abs_path
self.logger.info(f"设置表情GIF: {abs_path}")
self.update_queue.put(lambda: self._set_emotion_gif(self.emotion_label, abs_path))
def _set_emotion_gif(self, label, gif_path):
"""设置GIF动画到标签,带淡入淡出效果"""
if not label or self.root.isHidden():
return
try:
# 检查文件是否存在
if not os.path.exists(gif_path):
self.logger.error(f"GIF文件不存在: {gif_path}")
label.setText("😊")
return
# 如果当前已经设置了相同路径的动画,且正在播放,则不重复设置
if (self.emotion_movie and
getattr(self.emotion_movie, '_gif_path', None) == gif_path and
self.emotion_movie.state() == QMovie.Running):
return
# 如果正在进行动画,则只记录下一个待显示的表情,等当前动画完成后再切换
if self.is_emotion_animating:
self.next_emotion_path = gif_path
return
self.logger.info(f"加载GIF文件: {gif_path}")
# 标记正在进行动画
self.is_emotion_animating = True
# 如果已有动画在播放,先淡出当前动画
if self.emotion_movie and label.movie() == self.emotion_movie:
# 创建透明度效果(如果尚未创建)
if not self.emotion_effect:
self.emotion_effect = QGraphicsOpacityEffect(label)
label.setGraphicsEffect(self.emotion_effect)
self.emotion_effect.setOpacity(1.0)
# 创建淡出动画
self.emotion_animation = QPropertyAnimation(self.emotion_effect, b"opacity")
self.emotion_animation.setDuration(180) # 设置动画持续时间(毫秒)
self.emotion_animation.setStartValue(1.0)
self.emotion_animation.setEndValue(0.25)
# 当淡出完成后,设置新的GIF并开始淡入
def on_fade_out_finished():
try:
# 停止当前GIF
if self.emotion_movie:
self.emotion_movie.stop()
# 设置新的GIF并淡入
self._set_new_emotion_gif(label, gif_path)
except Exception as e:
self.logger.error(f"淡出动画完成后设置GIF失败: {e}")
self.is_emotion_animating = False
# 连接淡出完成信号
self.emotion_animation.finished.connect(on_fade_out_finished)
# 开始淡出动画
self.emotion_animation.start()
else:
# 如果没有之前的动画,直接设置新的GIF并淡入
self._set_new_emotion_gif(label, gif_path)
except Exception as e:
self.logger.error(f"更新表情GIF动画失败: {e}")
# 如果GIF加载失败,尝试显示默认表情
try:
label.setText("😊")
except Exception:
pass
self.is_emotion_animating = False
def _set_new_emotion_gif(self, label, gif_path):
"""设置新的GIF动画并执行淡入效果"""
try:
# 创建动画对象
movie = QMovie(gif_path)
if not movie.isValid():
self.logger.error(f"无效的GIF文件: {gif_path}")
label.setText("😊")
self.is_emotion_animating = False
return
# 配置动画
movie.setCacheMode(QMovie.CacheAll)
# 保存GIF路径到movie对象,用于比较
movie._gif_path = gif_path
# 连接信号
movie.error.connect(lambda: self.logger.error(f"GIF播放错误: {movie.lastError()}"))
# 保存新的动画对象
self.emotion_movie = movie
# 设置标签大小策略
label.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
label.setAlignment(Qt.AlignCenter)
# 设置动画到标签
label.setMovie(movie)
# 设置QMovie的速度为110,使动画更流畅(默认是100)
movie.setSpeed(105)
# 确保不透明度是0(完全透明)
if self.emotion_effect:
self.emotion_effect.setOpacity(0.0)
else:
self.emotion_effect = QGraphicsOpacityEffect(label)
label.setGraphicsEffect(self.emotion_effect)
self.emotion_effect.setOpacity(0.0)
# 开始播放动画
movie.start()
# 创建淡入动画
self.emotion_animation = QPropertyAnimation(self.emotion_effect, b"opacity")
self.emotion_animation.setDuration(180) # 淡入时间(毫秒)
self.emotion_animation.setStartValue(0.25)
self.emotion_animation.setEndValue(1.0)
# 淡入完成后检查是否有下一个待显示的表情
def on_fade_in_finished():
self.is_emotion_animating = False
# 如果有下一个待显示的表情,则继续切换
if self.next_emotion_path:
next_path = self.next_emotion_path
self.next_emotion_path = None
self._set_emotion_gif(label, next_path)
# 连接淡入完成信号
self.emotion_animation.finished.connect(on_fade_in_finished)
# 开始淡入动画
self.emotion_animation.start()
except Exception as e:
self.logger.error(f"设置新的GIF动画失败: {e}")
self.is_emotion_animating = False
# 如果设置失败,尝试显示默认表情
try:
label.setText("😊")
except Exception:
pass
def _safe_update_label(self, label, text):
"""安全地更新标签文本"""
if label and not self.root.isHidden():
try:
label.setText(text)
except RuntimeError as e:
self.logger.error(f"更新标签失败: {e}")
def start_update_threads(self):
"""启动更新线程"""
# 初始化表情缓存
self.last_emotion_path = None
def update_loop():
while self._running:
try:
# 更新状态
if self.status_update_callback:
status = self.status_update_callback()
if status:
self.update_status(status)
# 更新文本
if self.text_update_callback:
text = self.text_update_callback()
if text:
self.update_text(text)
# 更新表情 - 只在表情变化时更新
if self.emotion_update_callback:
emotion = self.emotion_update_callback()
if emotion:
# 直接调用update_emotion方法,它会处理重复检查
self.update_emotion(emotion)
except Exception as e:
self.logger.error(f"更新失败: {e}")
time.sleep(0.1)
threading.Thread(target=update_loop, daemon=True).start()
def on_close(self):
"""关闭窗口处理"""
self._running = False
if self.update_timer:
self.update_timer.stop()
if self.mic_timer:
self.mic_timer.stop()
if self.root:
self.root.close()
self.stop_keyboard_listener()
def start(self):
"""启动GUI"""
try:
# 确保QApplication实例在主线程中创建
self.app = QApplication.instance()
if self.app is None:
self.app = QApplication(sys.argv)
# 设置UI默认字体
default_font = QFont("ASLantTermuxFont Mono", 12)
self.app.setFont(default_font)
# 加载UI文件
from PyQt5 import uic
self.root = QWidget()
ui_path = Path(__file__).parent / "gui_display.ui"
if not ui_path.exists():
self.logger.error(f"UI文件不存在: {ui_path}")
raise FileNotFoundError(f"UI文件不存在: {ui_path}")
uic.loadUi(str(ui_path), self.root)
# 获取UI中的控件
self.status_label = self.root.findChild(QLabel, "status_label")
self.emotion_label = self.root.findChild(QLabel, "emotion_label")
self.tts_text_label = self.root.findChild(QLabel, "tts_text_label")
self.manual_btn = self.root.findChild(QPushButton, "manual_btn")
self.abort_btn = self.root.findChild(QPushButton, "abort_btn")
self.auto_btn = self.root.findChild(QPushButton, "auto_btn")
self.mode_btn = self.root.findChild(QPushButton, "mode_btn")
# 获取IOT页面控件
self.iot_card = self.root.findChild(QFrame, "iotPage") # 注意这里使用 "iotPage" 作为ID
if self.iot_card is None:
# 如果找不到 iotPage,尝试其他可能的名称
self.iot_card = self.root.findChild(QFrame, "iot_card")
if self.iot_card is None:
# 如果还找不到,尝试在 stackedWidget 中获取第二个页面作为 iot_card
self.stackedWidget = self.root.findChild(QStackedWidget, "stackedWidget")
if self.stackedWidget and self.stackedWidget.count() > 1:
self.iot_card = self.stackedWidget.widget(1) # 索引1是第二个页面
self.logger.info(f"使用 stackedWidget 的第2个页面作为 iot_card: {self.iot_card}")
else:
self.logger.warning("无法找到 iot_card,IOT设备功能将不可用")
else:
self.logger.info(f"找到 iot_card: {self.iot_card}")
# 音频控制栈组件
self.audio_control_stack = self.root.findChild(QStackedWidget, "audio_control_stack")
self.volume_page = self.root.findChild(QWidget, "volume_page")
self.mic_page = self.root.findChild(QWidget, "mic_page")
# 音量控制组件
self.volume_scale = self.root.findChild(QSlider, "volume_scale")
self.mute = self.root.findChild(QPushButton, "mute")
if self.mute:
self.mute.setCheckable(True)
self.mute.clicked.connect(self._on_mute_click)
# 获取或创建音量百分比标签
self.volume_label = self.root.findChild(QLabel, "volume_label")
if not self.volume_label and self.volume_scale:
# 如果UI中没有音量标签,动态创建一个
volume_layout = self.root.findChild(QHBoxLayout, "volume_layout")
if volume_layout:
self.volume_label = QLabel(f"{self.current_volume}%")
self.volume_label.setObjectName("volume_label")
self.volume_label.setMinimumWidth(40)
self.volume_label.setAlignment(Qt.AlignCenter)
volume_layout.addWidget(self.volume_label)
# 初始化麦克风可视化组件 - 使用UI中定义的QFrame
self.mic_visualizer_card = self.root.findChild(QFrame, "mic_visualizer_card")
self.mic_visualizer_widget = self.root.findChild(QWidget, "mic_visualizer_widget")
if self.mic_visualizer_widget:
# 创建可视化组件实例
self.mic_visualizer = MicrophoneVisualizer(self.mic_visualizer_widget)
# 设置布局以使可视化组件填充整个区域
layout = QVBoxLayout(self.mic_visualizer_widget)
layout.setContentsMargins(0, 0, 0, 0)
layout.addWidget(self.mic_visualizer)
# 创建更新定时器,但不启动
self.mic_timer = QTimer()
self.mic_timer.timeout.connect(self._update_mic_visualizer)
# 根据音量控制可用性设置组件状态
volume_control_working = self.volume_control_available and not self.volume_controller_failed
if not volume_control_working:
self.logger.warning("系统不支持音量控制或控制失败,音量控制功能已禁用")
# 禁用音量相关控件
if self.volume_scale:
self.volume_scale.setEnabled(False)
if self.mute:
self.mute.setEnabled(False)
if self.volume_label:
self.volume_label.setText("不可用")
else:
# 正常设置音量滑块初始值
if self.volume_scale:
self.volume_scale.setRange(0, 100)
self.volume_scale.setValue(self.current_volume)
self.volume_scale.valueChanged.connect(self._on_volume_change)
self.volume_scale.installEventFilter(self) # 安装事件过滤器
# 更新音量百分比显示
if self.volume_label:
self.volume_label.setText(f"{self.current_volume}%")
# 获取设置页面控件
self.wakeWordEnableSwitch = self.root.findChild(QCheckBox, "wakeWordEnableSwitch")
self.wakeWordsLineEdit = self.root.findChild(QLineEdit, "wakeWordsLineEdit")
self.saveSettingsButton = self.root.findChild(QPushButton, "saveSettingsButton")
# 获取新增的控件
# 使用 PyQt 标准控件替换
self.deviceIdLineEdit = self.root.findChild(QLineEdit, "deviceIdLineEdit")
self.wsProtocolComboBox = self.root.findChild(QComboBox, "wsProtocolComboBox")
self.wsAddressLineEdit = self.root.findChild(QLineEdit, "wsAddressLineEdit")
self.wsTokenLineEdit = self.root.findChild(QLineEdit, "wsTokenLineEdit")
# Home Assistant 控件引用
self.haProtocolComboBox = self.root.findChild(QComboBox, "haProtocolComboBox")
self.ha_server = self.root.findChild(QLineEdit, "ha_server")
self.ha_port = self.root.findChild(QLineEdit, "ha_port")
self.ha_key = self.root.findChild(QLineEdit, "ha_key")
self.Add_ha_devices = self.root.findChild(QPushButton, "Add_ha_devices")
# 获取 OTA 相关控件
self.otaProtocolComboBox = self.root.findChild(QComboBox, "otaProtocolComboBox")
self.otaAddressLineEdit = self.root.findChild(QLineEdit, "otaAddressLineEdit")
# 显式添加 ComboBox 选项,以防 UI 文件加载问题
if self.wsProtocolComboBox:
# 先清空,避免重复添加 (如果 .ui 文件也成功加载了选项)
self.wsProtocolComboBox.clear()
self.wsProtocolComboBox.addItems(["wss://", "ws://"])
# 显式添加OTA ComboBox选项
if self.otaProtocolComboBox:
self.otaProtocolComboBox.clear()
self.otaProtocolComboBox.addItems(["https://", "http://"])
# 显式添加 Home Assistant 协议下拉框选项
if self.haProtocolComboBox:
self.haProtocolComboBox.clear()
self.haProtocolComboBox.addItems(["http://", "https://"])
# 获取导航控件
self.stackedWidget = self.root.findChild(QStackedWidget, "stackedWidget")
self.nav_tab_bar = self.root.findChild(QTabBar, "nav_tab_bar")
# 初始化导航标签栏
self._setup_navigation()
# 连接按钮事件
if self.manual_btn:
self.manual_btn.pressed.connect(self._on_manual_button_press)
self.manual_btn.released.connect(self._on_manual_button_release)
if self.abort_btn:
self.abort_btn.clicked.connect(self._on_abort_button_click)
if self.auto_btn:
self.auto_btn.clicked.connect(self._on_auto_button_click)
# 默认隐藏自动模式按钮
self.auto_btn.hide()
if self.mode_btn:
self.mode_btn.clicked.connect(self._on_mode_button_click)
# 初始化文本输入框和发送按钮
self.text_input = self.root.findChild(QLineEdit, "text_input")
self.send_btn = self.root.findChild(QPushButton, "send_btn")
if self.text_input and self.send_btn:
self.send_btn.clicked.connect(self._on_send_button_click)
# 绑定Enter键发送文本
self.text_input.returnPressed.connect(self._on_send_button_click)
# 连接设置保存按钮事件
if self.saveSettingsButton:
self.saveSettingsButton.clicked.connect(self._save_settings)
# 连接Home Assistant设备导入按钮事件
if self.Add_ha_devices:
self.Add_ha_devices.clicked.connect(self._on_add_ha_devices_click)
# 设置鼠标事件
self.root.mousePressEvent = self.mousePressEvent
self.root.mouseReleaseEvent = self.mouseReleaseEvent
# 启动键盘监听
self.start_keyboard_listener()
# 启动更新线程
self.start_update_threads()
# 定时器处理更新队列
self.update_timer = QTimer()
self.update_timer.timeout.connect(self._process_updates)
self.update_timer.start(100)
# 在主线程中运行主循环
self.logger.info("开始启动GUI主循环")
self.root.show()
# self.root.showFullScreen() # 全屏显示
except Exception as e:
self.logger.error(f"GUI启动失败: {e}", exc_info=True)
# 尝试回退到CLI模式
print(f"GUI启动失败: {e},请尝试使用CLI模式")
raise
def update_mode_button_status(self, text: str):
"""更新模式按钮状态"""
self.update_queue.put(lambda: self._safe_update_button(self.mode_btn, text))
def update_button_status(self, text: str):
"""更新按钮状态 - 保留此方法以满足抽象基类要求"""
# 根据当前模式更新相应的按钮
if self.auto_mode:
self.update_queue.put(lambda: self._safe_update_button(self.auto_btn, text))
else:
# 在手动模式下,不通过此方法更新按钮文本
# 因为按钮文本由按下/释放事件直接控制
pass
def _safe_update_button(self, button, text):
"""安全地更新按钮文本"""
if button and not self.root.isHidden():
try:
button.setText(text)
except RuntimeError as e:
self.logger.error(f"更新按钮失败: {e}")
def _on_volume_change(self, value):
"""处理音量滑块变化,使用节流"""
def update_volume():
self.update_volume(value)
# 取消之前的定时器
if hasattr(self, "volume_update_timer") and self.volume_update_timer and self.volume_update_timer.isActive():
self.volume_update_timer.stop()
# 设置新的定时器,300ms 后更新音量
self.volume_update_timer = QTimer()
self.volume_update_timer.setSingleShot(True)
self.volume_update_timer.timeout.connect(update_volume)
self.volume_update_timer.start(300)
def update_volume(self, volume: int):
"""重写父类的update_volume方法,确保UI同步更新"""
# 检查音量控制是否可用
if not self.volume_control_available or self.volume_controller_failed:
return
# 调用父类的update_volume方法更新系统音量
super().update_volume(volume)
# 更新UI音量滑块和标签
if not self.root.isHidden():
try:
if self.volume_scale:
self.volume_scale.setValue(volume)
if self.volume_label:
self.volume_label.setText(f"{volume}%")
except RuntimeError as e:
self.logger.error(f"更新音量UI失败: {e}")
def start_keyboard_listener(self):
"""启动键盘监听"""
try:
def on_press(key):
try:
# F2 按键处理 - 在手动模式下处理
if key == pynput_keyboard.Key.f2 and not self.auto_mode:
if self.button_press_callback:
self.button_press_callback()
if self.manual_btn:
self.update_queue.put(lambda: self._safe_update_button(self.manual_btn, "松开以停止"))
# F3 按键处理 - 打断
elif key == pynput_keyboard.Key.f3:
if self.abort_callback:
self.abort_callback()
except Exception as e:
self.logger.error(f"键盘事件处理错误: {e}")
def on_release(key):
try:
# F2 释放处理 - 在手动模式下处理
if key == pynput_keyboard.Key.f2 and not self.auto_mode:
if self.button_release_callback:
self.button_release_callback()
if self.manual_btn:
self.update_queue.put(lambda: self._safe_update_button(self.manual_btn, "按住后说话"))
except Exception as e:
self.logger.error(f"键盘事件处理错误: {e}")
# 创建并启动监听器
self.keyboard_listener = pynput_keyboard.Listener(
on_press=on_press, on_release=on_release
)
self.keyboard_listener.start()
self.logger.info("键盘监听器初始化成功")
except Exception as e:
self.logger.error(f"键盘监听器初始化失败: {e}")
def stop_keyboard_listener(self):
"""停止键盘监听"""
if self.keyboard_listener:
try:
self.keyboard_listener.stop()
self.keyboard_listener = None
self.logger.info("键盘监听器已停止")
except Exception as e:
self.logger.error(f"停止键盘监听器失败: {e}")
def mousePressEvent(self, event: QMouseEvent):
"""鼠标按下事件处理"""
if event.button() == Qt.LeftButton:
self.last_mouse_pos = event.pos()
def mouseReleaseEvent(self, event: QMouseEvent):
"""鼠标释放事件处理 (修改为使用 QTabBar 索引)"""
if event.button() == Qt.LeftButton and self.last_mouse_pos is not None:
delta = event.pos().x() - self.last_mouse_pos.x()
self.last_mouse_pos = None
if abs(delta) > 100: # 滑动阈值
current_index = self.nav_tab_bar.currentIndex() if self.nav_tab_bar else 0
tab_count = self.nav_tab_bar.count() if self.nav_tab_bar else 0
if delta > 0 and current_index > 0: # 右滑
new_index = current_index - 1
if self.nav_tab_bar: self.nav_tab_bar.setCurrentIndex(new_index)
elif delta < 0 and current_index < tab_count - 1: # 左滑
new_index = current_index + 1
if self.nav_tab_bar: self.nav_tab_bar.setCurrentIndex(new_index)
def _on_mute_click(self):
"""静音按钮点击事件处理 (使用 isChecked 状态)"""
try:
if not self.volume_control_available or self.volume_controller_failed or not self.mute:
return
self.is_muted = self.mute.isChecked() # 获取按钮的选中状态
if self.is_muted:
# 保存当前音量并设置为0
self.pre_mute_volume = self.current_volume
self.update_volume(0)
self.mute.setText("取消静音") # 更新文本
if self.volume_label:
self.volume_label.setText("静音") # 或者 "0%"
else:
# 恢复之前的音量
self.update_volume(self.pre_mute_volume)
self.mute.setText("点击静音") # 恢复文本
if self.volume_label:
self.volume_label.setText(f"{self.pre_mute_volume}%")
except Exception as e:
self.logger.error(f"静音按钮点击事件处理失败: {e}")
def _load_settings(self):
"""加载配置文件并更新设置页面UI (使用ConfigManager)"""
try:
# 使用ConfigManager获取配置
config_manager = ConfigManager.get_instance()
# 获取唤醒词配置
use_wake_word = config_manager.get_config("WAKE_WORD_OPTIONS.USE_WAKE_WORD", False)
wake_words = config_manager.get_config("WAKE_WORD_OPTIONS.WAKE_WORDS", [])
if self.wakeWordEnableSwitch:
self.wakeWordEnableSwitch.setChecked(use_wake_word)
if self.wakeWordsLineEdit:
self.wakeWordsLineEdit.setText(", ".join(wake_words))
# 获取系统选项
device_id = config_manager.get_config("SYSTEM_OPTIONS.DEVICE_ID", "")
websocket_url = config_manager.get_config("SYSTEM_OPTIONS.NETWORK.WEBSOCKET_URL", "")
websocket_token = config_manager.get_config("SYSTEM_OPTIONS.NETWORK.WEBSOCKET_ACCESS_TOKEN", "")
ota_url = config_manager.get_config("SYSTEM_OPTIONS.NETWORK.OTA_VERSION_URL", "")
if self.deviceIdLineEdit:
self.deviceIdLineEdit.setText(device_id)
# 解析 WebSocket URL 并设置协议和地址
if websocket_url and self.wsProtocolComboBox and self.wsAddressLineEdit:
try:
parsed_url = urlparse(websocket_url)
protocol = parsed_url.scheme
# 保留URL末尾的斜杠
address = parsed_url.netloc + parsed_url.path
# 确保地址不以协议开头
if address.startswith(f"{protocol}://"):
address = address[len(f"{protocol}://"):]
index = self.wsProtocolComboBox.findText(f"{protocol}://", Qt.MatchFixedString)
if index >= 0:
self.wsProtocolComboBox.setCurrentIndex(index)
else:
self.logger.warning(f"未知的 WebSocket 协议: {protocol}")
self.wsProtocolComboBox.setCurrentIndex(0) # 默认为 wss
self.wsAddressLineEdit.setText(address)
except Exception as e:
self.logger.error(f"解析 WebSocket URL 时出错: {websocket_url} - {e}")
self.wsProtocolComboBox.setCurrentIndex(0)
self.wsAddressLineEdit.clear()
if self.wsTokenLineEdit:
self.wsTokenLineEdit.setText(websocket_token)
# 解析OTA URL并设置协议和地址
if ota_url and self.otaProtocolComboBox and self.otaAddressLineEdit:
try:
parsed_url = urlparse(ota_url)
protocol = parsed_url.scheme
# 保留URL末尾的斜杠
address = parsed_url.netloc + parsed_url.path
# 确保地址不以协议开头
if address.startswith(f"{protocol}://"):
address = address[len(f"{protocol}://"):]
if protocol == "https":
self.otaProtocolComboBox.setCurrentIndex(0)
elif protocol == "http":
self.otaProtocolComboBox.setCurrentIndex(1)
else:
self.logger.warning(f"未知的OTA协议: {protocol}")
self.otaProtocolComboBox.setCurrentIndex(0) # 默认为https
self.otaAddressLineEdit.setText(address)
except Exception as e:
self.logger.error(f"解析OTA URL时出错: {ota_url} - {e}")
self.otaProtocolComboBox.setCurrentIndex(0)
self.otaAddressLineEdit.clear()
# 加载Home Assistant配置
ha_options = config_manager.get_config("HOME_ASSISTANT", {})
ha_url = ha_options.get("URL", "")
ha_token = ha_options.get("TOKEN", "")
# 解析Home Assistant URL并设置协议和地址
if ha_url and self.haProtocolComboBox and self.ha_server:
try:
parsed_url = urlparse(ha_url)
protocol = parsed_url.scheme
port = parsed_url.port
# 地址部分不包含端口
address = parsed_url.netloc
if ":" in address: # 如果地址中包含端口号
address = address.split(":")[0]
# 设置协议
if protocol == "https":
self.haProtocolComboBox.setCurrentIndex(1)
else: # http或其他协议,默认http
self.haProtocolComboBox.setCurrentIndex(0)
# 设置地址
self.ha_server.setText(address)
# 设置端口(如果有)
if port and self.ha_port:
self.ha_port.setText(str(port))
except Exception as e:
self.logger.error(f"解析Home Assistant URL时出错: {ha_url} - {e}")
# 出错时使用默认值
self.haProtocolComboBox.setCurrentIndex(0) # 默认为http
self.ha_server.clear()
# 设置Home Assistant Token
if self.ha_key:
self.ha_key.setText(ha_token)
except Exception as e:
self.logger.error(f"加载配置文件时出错: {e}", exc_info=True)
QMessageBox.critical(self.root, "错误", f"加载设置失败: {e}")
def _save_settings(self):
"""保存设置页面的更改到配置文件 (使用ConfigManager)"""
try:
# 使用ConfigManager获取实例
config_manager = ConfigManager.get_instance()
# 收集所有UI界面上的配置值
# 唤醒词配置
use_wake_word = self.wakeWordEnableSwitch.isChecked() if self.wakeWordEnableSwitch else False
wake_words_text = self.wakeWordsLineEdit.text() if self.wakeWordsLineEdit else ""
wake_words = [word.strip() for word in wake_words_text.split(',') if word.strip()]
# 系统选项
new_device_id = self.deviceIdLineEdit.text() if self.deviceIdLineEdit else ""
selected_protocol_text = self.wsProtocolComboBox.currentText() if self.wsProtocolComboBox else "wss://"
selected_protocol = selected_protocol_text.replace("://", "")
new_ws_address = self.wsAddressLineEdit.text() if self.wsAddressLineEdit else ""
new_ws_token = self.wsTokenLineEdit.text() if self.wsTokenLineEdit else ""
# OTA地址配置
selected_ota_protocol_text = self.otaProtocolComboBox.currentText() if self.otaProtocolComboBox else "https://"
selected_ota_protocol = selected_ota_protocol_text.replace("://", "")
new_ota_address = self.otaAddressLineEdit.text() if self.otaAddressLineEdit else ""
# 确保地址不以 / 开头
if new_ws_address.startswith('/'):
new_ws_address = new_ws_address[1:]
# 构造WebSocket URL
new_websocket_url = f"{selected_protocol}://{new_ws_address}"
if new_websocket_url and not new_websocket_url.endswith('/'):
new_websocket_url += '/'
# 构造OTA URL
new_ota_url = f"{selected_ota_protocol}://{new_ota_address}"
if new_ota_url and not new_ota_url.endswith('/'):
new_ota_url += '/'
# Home Assistant配置
ha_protocol = self.haProtocolComboBox.currentText().replace("://", "") if self.haProtocolComboBox else "http"
ha_server = self.ha_server.text() if self.ha_server else ""
ha_port = self.ha_port.text() if self.ha_port else ""
ha_key = self.ha_key.text() if self.ha_key else ""
# 构建Home Assistant URL
if ha_server:
ha_url = f"{ha_protocol}://{ha_server}"
if ha_port:
ha_url += f":{ha_port}"
else:
ha_url = ""
# 获取完整的当前配置
current_config = config_manager._config.copy()
# 直接从磁盘读取最新的config.json,获取最新的设备列表
try:
import json
config_path = Path(__file__).parent.parent.parent / "config" / "config.json"
if config_path.exists():
with open(config_path, 'r', encoding='utf-8') as f:
disk_config = json.load(f)
# 获取磁盘上最新的设备列表
if ("HOME_ASSISTANT" in disk_config and
"DEVICES" in disk_config["HOME_ASSISTANT"]):
# 使用磁盘上的设备列表
latest_devices = disk_config["HOME_ASSISTANT"]["DEVICES"]
self.logger.info(f"从配置文件读取了 {len(latest_devices)} 个设备")
else:
latest_devices = []
else:
latest_devices = []
except Exception as e:
self.logger.error(f"读取配置文件中的设备列表失败: {e}")
# 如果读取失败,使用内存中的设备列表
if "HOME_ASSISTANT" in current_config and "DEVICES" in current_config["HOME_ASSISTANT"]:
latest_devices = current_config["HOME_ASSISTANT"]["DEVICES"]
else:
latest_devices = []
# 更新配置对象(不写入文件)
# 1. 更新唤醒词配置
if "WAKE_WORD_OPTIONS" not in current_config:
current_config["WAKE_WORD_OPTIONS"] = {}
current_config["WAKE_WORD_OPTIONS"]["USE_WAKE_WORD"] = use_wake_word
current_config["WAKE_WORD_OPTIONS"]["WAKE_WORDS"] = wake_words
# 2. 更新系统选项
if "SYSTEM_OPTIONS" not in current_config:
current_config["SYSTEM_OPTIONS"] = {}
current_config["SYSTEM_OPTIONS"]["DEVICE_ID"] = new_device_id
if "NETWORK" not in current_config["SYSTEM_OPTIONS"]:
current_config["SYSTEM_OPTIONS"]["NETWORK"] = {}
current_config["SYSTEM_OPTIONS"]["NETWORK"]["WEBSOCKET_URL"] = new_websocket_url
current_config["SYSTEM_OPTIONS"]["NETWORK"]["WEBSOCKET_ACCESS_TOKEN"] = new_ws_token
current_config["SYSTEM_OPTIONS"]["NETWORK"]["OTA_VERSION_URL"] = new_ota_url
# 3. 更新Home Assistant配置
if "HOME_ASSISTANT" not in current_config:
current_config["HOME_ASSISTANT"] = {}
current_config["HOME_ASSISTANT"]["URL"] = ha_url
current_config["HOME_ASSISTANT"]["TOKEN"] = ha_key
# 使用最新的设备列表
current_config["HOME_ASSISTANT"]["DEVICES"] = latest_devices
# 一次性保存整个配置
save_success = config_manager._save_config(current_config)
if save_success:
self.logger.info("设置已成功保存到 config.json")
reply = QMessageBox.question(self.root, "保存成功",
"设置已保存。\n部分设置需要重启应用程序才能生效。\n\n是否立即重启?",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.Yes:
self.logger.info("用户选择重启应用程序。")
restart_program()
else:
raise Exception("保存配置文件失败")
except Exception as e:
self.logger.error(f"保存设置时发生未知错误: {e}", exc_info=True)
QMessageBox.critical(self.root, "错误", f"保存设置失败: {e}")
def _on_add_ha_devices_click(self):
"""处理添加Home Assistant设备按钮点击事件"""
try:
self.logger.info("启动Home Assistant设备管理器...")
# 获取当前文件所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 获取项目根目录(假设current_dir是src/display,上级目录就是src,再上级就是项目根目录)
project_root = os.path.dirname(os.path.dirname(current_dir))
# 获取脚本路径
script_path = os.path.join(project_root, "scripts", "ha_device_manager_ui.py")
if not os.path.exists(script_path):
self.logger.error(f"设备管理器脚本不存在: {script_path}")
QMessageBox.critical(self.root, "错误", "设备管理器脚本不存在")
return
# 构建命令并执行
cmd = [sys.executable, script_path]
# 使用subprocess启动新进程
import subprocess
subprocess.Popen(cmd)
except Exception as e:
self.logger.error(f"启动Home Assistant设备管理器失败: {e}", exc_info=True)
QMessageBox.critical(self.root, "错误", f"启动设备管理器失败: {e}")
def _update_mic_visualizer(self):
"""更新麦克风可视化"""
if not self.is_listening or not self.mic_visualizer:
return
try:
# 获取当前麦克风音量级别,范围0-1
volume_level = self._get_current_mic_level()
# 更新可视化组件
self.mic_visualizer.set_volume(min(1.0, volume_level))
except Exception as e:
self.logger.error(f"更新麦克风可视化失败: {e}")
def _get_current_mic_level(self):
"""获取当前麦克风音量级别"""
try:
from src.application import Application
app = Application.get_instance()
if app and hasattr(app, 'audio_codec') and app.audio_codec:
# 从音频编解码器获取原始音频数据
if hasattr(app.audio_codec, 'input_stream') and app.audio_codec.input_stream:
# 读取音频数据并计算音量级别
try:
# 获取输入流中可读取的数据量
available = app.audio_codec.input_stream.get_read_available()
if available > 0:
# 读取一小块数据用于计算音量
chunk_size = min(1024, available)
audio_data = app.audio_codec.input_stream.read(
chunk_size,
exception_on_overflow=False
)
# 将字节数据转换为numpy数组进行处理
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 计算音量级别 (0.0-1.0)
# 16位音频的最大值是32768,计算音量占最大值的比例
# 使用均方根(RMS)值计算有效音量
rms = np.sqrt(np.mean(np.square(audio_array.astype(np.float32))))
# 标准化为0-1范围,32768是16位音频的最大值
# 增加放大系数以提高灵敏度
volume = min(1.0, rms / 32768 * 10) # 放大10倍使小音量更明显
# 应用平滑处理
if hasattr(self, '_last_volume'):
# 平滑过渡,保留70%上次数值,增加30%新数值
self._last_volume = self._last_volume * 0.7 + volume * 0.3
else:
self._last_volume = volume
return self._last_volume
except Exception as e:
self.logger.debug(f"读取麦克风数据失败: {e}")
except Exception as e:
self.logger.debug(f"获取麦克风音量失败: {e}")
# 如果无法获取实际音量,返回上次的音量或默认值
if hasattr(self, '_last_volume'):
# 缓慢衰减上次的音量
self._last_volume *= 0.9
return self._last_volume
else:
self._last_volume = 0.0 # 初始化为 0
return self._last_volume
def _start_mic_visualization(self):
"""开始麦克风可视化"""
if self.mic_visualizer and self.mic_timer and self.audio_control_stack:
self.is_listening = True
# 切换到麦克风可视化页面
self.audio_control_stack.setCurrentWidget(self.mic_page)
# 启动定时器更新可视化
if not self.mic_timer.isActive():
self.mic_timer.start(50) # 20fps
def _stop_mic_visualization(self):
"""停止麦克风可视化"""
self.is_listening = False
# 停止定时器
if self.mic_timer and self.mic_timer.isActive():
self.mic_timer.stop()
# 重置可视化音量
if self.mic_visualizer:
self.mic_visualizer.set_volume(0.0)
# 确保动画平滑过渡到0
if hasattr(self, '_last_volume'):
self._last_volume = 0.0
# 切换回音量控制页面
if self.audio_control_stack:
self.audio_control_stack.setCurrentWidget(self.volume_page)
def _on_send_button_click(self):
"""处理发送文本按钮点击事件"""
if not self.text_input or not self.send_text_callback:
return
text = self.text_input.text().strip()
if not text:
return
# 清空输入框
self.text_input.clear()
# 获取应用程序的事件循环并在其中运行协程
from src.application import Application
app = Application.get_instance()
if app and app.loop:
import asyncio
asyncio.run_coroutine_threadsafe(
self.send_text_callback(text),
app.loop
)
else:
self.logger.error("应用程序实例或事件循环不可用")
def _load_iot_devices(self):
"""加载并显示Home Assistant设备列表"""
try:
# 先清空现有设备列表
if hasattr(self, 'devices_list') and self.devices_list:
for widget in self.devices_list:
widget.deleteLater()
self.devices_list = []
# 清空设备状态标签引用
self.device_labels = {}
# 获取设备布局
if self.iot_card:
# 记录原来的标题文本,以便后面重新设置
title_text = ""
if self.history_title:
title_text = self.history_title.text()
# 设置self.history_title为None,以避免在清除旧布局时被删除导致引用错误
self.history_title = None
# 获取原布局并删除所有子控件
old_layout = self.iot_card.layout()
if old_layout:
# 清空布局中的所有控件
while old_layout.count():
item = old_layout.takeAt(0)
widget = item.widget()
if widget:
widget.deleteLater()
# 在现有布局中重新添加控件,而不是创建新布局
new_layout = old_layout
else:
# 如果没有现有布局,则创建一个新的
new_layout = QVBoxLayout()
self.iot_card.setLayout(new_layout)
# 重置布局属性
new_layout.setContentsMargins(2, 2, 2, 2) # 进一步减小外边距
new_layout.setSpacing(2) # 进一步减小控件间距
# 创建标题
self.history_title = QLabel(title_text)
self.history_title.setFont(QFont(self.app.font().family(), 12)) # 字体缩小
self.history_title.setAlignment(Qt.AlignCenter) # 居中对齐
self.history_title.setContentsMargins(5, 2, 0, 2) # 设置标题的边距
self.history_title.setMaximumHeight(25) # 减小标题高度
new_layout.addWidget(self.history_title)
# 尝试从配置文件加载设备列表
try:
with open(CONFIG_PATH, 'r', encoding='utf-8') as f:
config_data = json.load(f)
devices = config_data.get("HOME_ASSISTANT", {}).get("DEVICES", [])
# 更新标题
self.history_title.setText(f"已连接设备 ({len(devices)})")
# 创建滚动区域
scroll_area = QScrollArea()
scroll_area.setWidgetResizable(True)
scroll_area.setFrameShape(QFrame.NoFrame) # 移除边框
scroll_area.setStyleSheet("background: transparent;") # 透明背景
# 创建滚动区域的内容容器
container = QWidget()
container.setStyleSheet("background: transparent;") # 透明背景
# 创建网格布局,设置顶部对齐
grid_layout = QGridLayout(container)
grid_layout.setContentsMargins(3, 3, 3, 3) # 增加外边距
grid_layout.setSpacing(8) # 增加网格间距
grid_layout.setAlignment(Qt.AlignTop) # 设置顶部对齐
# 设置网格每行显示的卡片数量
cards_per_row = 3 # 每行显示3个设备卡片
# 遍历设备并添加到网格布局
for i, device in enumerate(devices):
entity_id = device.get('entity_id', '')
friendly_name = device.get('friendly_name', '')
# 解析friendly_name - 提取位置和设备名称
location = friendly_name
device_name = ""
if ',' in friendly_name:
parts = friendly_name.split(',', 1)
location = parts[0].strip()
device_name = parts[1].strip()
# 创建设备卡片 (使用QFrame替代CardWidget)
device_card = QFrame()
device_card.setMinimumHeight(90) # 增加最小高度
device_card.setMaximumHeight(150) # 增加最大高度以适应换行文本
device_card.setMinimumWidth(200) # 增加宽度
device_card.setProperty("entity_id", entity_id) # 存储entity_id
# 设置卡片样式 - 轻微背景色,圆角,阴影效果
device_card.setStyleSheet("""
QFrame {
border-radius: 5px;
background-color: rgba(255, 255, 255, 0.7);
border: none;
}
""")
card_layout = QVBoxLayout(device_card)
card_layout.setContentsMargins(10, 8, 10, 8) # 内边距
card_layout.setSpacing(2) # 控件间距
# 设备名称 - 显示在第一行(加粗)并允许换行
device_name_label = QLabel(f"<b>{device_name}</b>")
device_name_label.setFont(QFont(self.app.font().family(), 14))
device_name_label.setWordWrap(True) # 启用自动换行
device_name_label.setMinimumHeight(20) # 设置最小高度
device_name_label.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Minimum) # 水平扩展,垂直最小
card_layout.addWidget(device_name_label)
# 设备位置 - 显示在第二行(不加粗)
location_label = QLabel(f"{location}")
location_label.setFont(QFont(self.app.font().family(), 12))
location_label.setStyleSheet("color: #666666;")
card_layout.addWidget(location_label)
# 添加分隔线
line = QFrame()
line.setFrameShape(QFrame.HLine)
line.setFrameShadow(QFrame.Sunken)
line.setStyleSheet("background-color: #E0E0E0;")
line.setMaximumHeight(1)
card_layout.addWidget(line)
# 设备状态 - 根据设备类型设置不同的默认状态
state_text = "未知"
if "light" in entity_id:
state_text = "关闭"
status_display = f"状态: {state_text}"
elif "sensor" in entity_id:
if "temperature" in entity_id:
state_text = "0℃"
status_display = state_text
elif "humidity" in entity_id:
state_text = "0%"
status_display = state_text
else:
state_text = "正常"
status_display = f"状态: {state_text}"
elif "switch" in entity_id:
state_text = "关闭"
status_display = f"状态: {state_text}"
elif "button" in entity_id:
state_text = "可用"
status_display = f"状态: {state_text}"
else:
status_display = state_text
# 直接显示状态值
state_label = QLabel(status_display)
state_label.setFont(QFont(self.app.font().family(), 14))
state_label.setStyleSheet("color: #2196F3; border: none;") # 添加无边框样式
card_layout.addWidget(state_label)
# 保存状态标签引用
self.device_labels[entity_id] = state_label
# 计算行列位置
row = i // cards_per_row
col = i % cards_per_row
# 将卡片添加到网格布局
grid_layout.addWidget(device_card, row, col)
# 保存引用以便后续清理
self.devices_list.append(device_card)
# 设置滚动区域内容
container.setLayout(grid_layout)
scroll_area.setWidget(container)
# 将滚动区域添加到主布局
new_layout.addWidget(scroll_area)
# 设置滚动区域样式
scroll_area.setStyleSheet("""
QScrollArea {
border: none;
background-color: transparent;
}
QScrollBar:vertical {
border: none;
background-color: #F5F5F5;
width: 8px;
border-radius: 4px;
}
QScrollBar::handle:vertical {
background-color: #BDBDBD;
border-radius: 4px;
}
QScrollBar::add-line:vertical, QScrollBar::sub-line:vertical {
height: 0px;
}
""")
# 停止现有的更新定时器(如果存在)
if self.ha_update_timer and self.ha_update_timer.isActive():
self.ha_update_timer.stop()
# 创建并启动一个定时器,每1秒更新一次设备状态
self.ha_update_timer = QTimer()
self.ha_update_timer.timeout.connect(self._update_device_states)
self.ha_update_timer.start(1000) # 1秒更新一次
# 立即执行一次更新
self._update_device_states()
except Exception as e:
# 如果加载设备失败,创建一个错误提示布局
self.logger.error(f"读取设备配置失败: {e}")
self.history_title = QLabel("加载设备配置失败")
self.history_title.setFont(QFont(self.app.font().family(), 14, QFont.Bold))
self.history_title.setAlignment(Qt.AlignCenter)
new_layout.addWidget(self.history_title)
error_label = QLabel(f"错误信息: {str(e)}")
error_label.setWordWrap(True)
error_label.setStyleSheet("color: red;")
new_layout.addWidget(error_label)
except Exception as e:
self.logger.error(f"加载IOT设备失败: {e}", exc_info=True)
try:
# 在发生错误时尝试恢复界面
old_layout = self.iot_card.layout()
# 如果已有布局,清空它
if old_layout:
while old_layout.count():
item = old_layout.takeAt(0)
widget = item.widget()
if widget:
widget.deleteLater()
# 使用现有布局
new_layout = old_layout
else:
# 创建新布局
new_layout = QVBoxLayout()
self.iot_card.setLayout(new_layout)
self.history_title = QLabel("加载设备失败")
self.history_title.setFont(QFont(self.app.font().family(), 14, QFont.Bold))
self.history_title.setAlignment(Qt.AlignCenter)
new_layout.addWidget(self.history_title)
error_label = QLabel(f"错误信息: {str(e)}")
error_label.setWordWrap(True)
error_label.setStyleSheet("color: red;")
new_layout.addWidget(error_label)
except Exception as e2:
self.logger.error(f"恢复界面失败: {e2}", exc_info=True)
def _update_device_states(self):
"""更新Home Assistant设备状态"""
# 检查当前是否在IOT界面
if not self.stackedWidget or self.stackedWidget.currentIndex() != 1:
return
# 读取配置文件获取Home Assistant连接信息
try:
with open(CONFIG_PATH, 'r', encoding='utf-8') as f:
config_data = json.load(f)
ha_options = config_data.get("HOME_ASSISTANT", {})
ha_url = ha_options.get("URL", "")
ha_token = ha_options.get("TOKEN", "")
if not ha_url or not ha_token:
self.logger.warning("Home Assistant URL或Token未配置,无法更新设备状态")
return
# 为每个设备查询状态
for entity_id, label in self.device_labels.items():
threading.Thread(
target=self._fetch_device_state,
args=(ha_url, ha_token, entity_id, label),
daemon=True
).start()
except Exception as e:
self.logger.error(f"更新Home Assistant设备状态失败: {e}", exc_info=True)
def _fetch_device_state(self, ha_url, ha_token, entity_id, label):
"""获取单个设备的状态"""
import requests
try:
# 构造API请求URL
api_url = f"{ha_url}/api/states/{entity_id}"
headers = {
"Authorization": f"Bearer {ha_token}",
"Content-Type": "application/json"
}
# 发送请求
response = requests.get(api_url, headers=headers, timeout=5)
if response.status_code == 200:
state_data = response.json()
state = state_data.get("state", "unknown")
# 更新设备状态
self.device_states[entity_id] = state
# 更新UI
self._update_device_ui(entity_id, state, label)
else:
self.logger.warning(f"获取设备状态失败: {entity_id}, 状态码: {response.status_code}")
except requests.RequestException as e:
self.logger.error(f"请求Home Assistant API失败: {e}")
except Exception as e:
self.logger.error(f"处理设备状态时出错: {e}")
def _update_device_ui(self, entity_id, state, label):
"""更新设备UI显示"""
# 在主线程中执行UI更新
self.update_queue.put(lambda: self._safe_update_device_label(entity_id, state, label))
def _safe_update_device_label(self, entity_id, state, label):
"""安全地更新设备状态标签"""
if not label or self.root.isHidden():
return
try:
display_state = state # 默认显示原始状态
# 根据设备类型格式化状态显示
if "light" in entity_id or "switch" in entity_id:
if state == "on":
display_state = "状态: 开启"
label.setStyleSheet("color: #4CAF50; border: none;") # 绿色表示开启,无边框
else:
display_state = "状态: 关闭"
label.setStyleSheet("color: #9E9E9E; border: none;") # 灰色表示关闭,无边框
elif "temperature" in entity_id:
try:
temp = float(state)
display_state = f"{temp:.1f}℃"
label.setStyleSheet("color: #FF9800; border: none;") # 橙色表示温度,无边框
except ValueError:
display_state = state
elif "humidity" in entity_id:
try:
humidity = float(state)
display_state = f"{humidity:.0f}%"
label.setStyleSheet("color: #03A9F4; border: none;") # 浅蓝色表示湿度,无边框
except ValueError:
display_state = state
elif "battery" in entity_id:
try:
battery = float(state)
display_state = f"{battery:.0f}%"
# 根据电池电量设置不同颜色
if battery < 20:
label.setStyleSheet("color: #F44336; border: none;") # 红色表示低电量,无边框
else:
label.setStyleSheet("color: #4CAF50; border: none;") # 绿色表示正常电量,无边框
except ValueError:
display_state = state
else:
display_state = f"状态: {state}"
label.setStyleSheet("color: #2196F3; border: none;") # 默认颜色,无边框
# 显示状态值
label.setText(f"{display_state}")
except RuntimeError as e:
self.logger.error(f"更新设备状态标签失败: {e}")
class MicrophoneVisualizer(QFrame):
"""麦克风音量可视化组件 - 波形显示版"""
def __init__(self, parent=None):
super().__init__(parent)
self.setMinimumHeight(50)
self.setFrameShape(QFrame.NoFrame)
# 初始化音量数据
self.current_volume = 0.0
self.target_volume = 0.0
# 波形历史数据(用于绘制波形图)- 增加历史点数使波形更平滑
self.history_max = 30 # 增加历史数据点数量
self.volume_history = [0.0] * self.history_max
# 创建平滑动画效果
self.animation_timer = QTimer()
self.animation_timer.timeout.connect(self._update_animation)
self.animation_timer.start(16) # 约60fps
# 颜色设置
self.min_color = QColor(80, 150, 255) # 低音量时的颜色 (蓝色)
self.max_color = QColor(255, 100, 100) # 高音量时的颜色 (红色)
self.current_color = self.min_color.name()
# 添加状态持续时间计数器,防止状态频繁变化
self.current_status = "安静" # 当前显示的状态
self.target_status = "安静" # 目标状态
self.status_hold_count = 0 # 状态保持计数器
self.status_threshold = 5 # 状态变化阈值(必须连续5帧达到阈值才切换状态)
# 透明背景
self.setStyleSheet("background-color: transparent;")
def set_volume(self, volume):
"""设置当前音量,范围0.0-1.0"""
# 确保音量在有效范围内
volume = max(0.0, min(1.0, volume))
self.target_volume = volume
# 更新历史数据(添加新值并移除最旧的值)
self.volume_history.append(volume)
if len(self.volume_history) > self.history_max:
self.volume_history.pop(0)
# 更新状态文本(带状态变化滞后)
volume_percent = int(volume * 100)
# 根据音量级别确定目标状态
if volume_percent < 5:
new_status = "静音"
elif volume_percent < 20:
new_status = "安静"
elif volume_percent < 50:
new_status = "正常"
elif volume_percent < 75:
new_status = "较大"
else:
new_status = "很大"
# 状态切换逻辑(带滞后性)
if new_status == self.target_status:
# 相同状态,增加计数
self.status_hold_count += 1
else:
# 不同状态,重置为新状态
self.target_status = new_status
self.status_hold_count = 0
# 只有当状态持续一定时间后才切换显示状态
if self.status_hold_count >= self.status_threshold:
self.current_status = self.target_status
self.update() # 触发重绘
def _update_animation(self):
"""更新动画效果"""
# 平滑过渡到目标音量 - 提高响应性
self.current_volume += (self.target_volume - self.current_volume) * 0.3
# 计算颜色过渡
r = int(self.min_color.red() + (self.max_color.red() - self.min_color.red()) * self.current_volume)
g = int(self.min_color.green() + (self.max_color.green() - self.min_color.green()) * self.current_volume)
b = int(self.min_color.blue() + (self.max_color.blue() - self.min_color.blue()) * self.current_volume)
self.current_color = QColor(r, g, b).name()
self.update()
def paintEvent(self, event):
"""绘制事件"""
super().paintEvent(event)
painter = QPainter(self)
painter.setRenderHint(QPainter.Antialiasing)
try:
# 获取绘制区域
rect = self.rect()
# 绘制波形图
self._draw_waveform(painter, rect)
# 添加音量状态文本
small_font = painter.font()
small_font.setPointSize(10)
painter.setFont(small_font)
painter.setPen(QColor(100, 100, 100))
# 在底部显示状态文本
status_rect = QRect(rect.left(), rect.bottom() - 20, rect.width(), 20)
# 使用稳定的状态文本显示
status_text = f"声音: {self.current_status}"
painter.drawText(status_rect, Qt.AlignCenter, status_text)
except Exception as e:
self.logger.error(f"绘制波形图失败: {e}") if hasattr(self, 'logger') else None
finally:
painter.end()
def _draw_waveform(self, painter, rect):
"""绘制波形图"""
# 如果没有足够的历史数据,返回
if len(self.volume_history) < 2:
return
# 波形图区域 - 扩大为几乎整个控件
wave_rect = QRect(rect.left() + 10, rect.top() + 10,
rect.width() - 20, rect.height() - 40)
# 设置半透明背景
bg_color = QColor(240, 240, 240, 30)
painter.setPen(Qt.NoPen)
painter.setBrush(QBrush(bg_color))
painter.drawRoundedRect(wave_rect, 5, 5)
# 设置波形图线条样式
wave_pen = QPen(QColor(self.current_color))
wave_pen.setWidth(2)
painter.setPen(wave_pen)
# 计算波形图点
history_len = len(self.volume_history)
point_interval = wave_rect.width() / (history_len - 1)
# 创建波形图路径
path = QPainterPath()
# 波形图起点(从左下角开始)
start_x = wave_rect.left()
mid_y = wave_rect.top() + wave_rect.height() / 2
# 平滑波形显示 - 减小振幅变化,让无声和小声时波形更平稳
amplitude_factor = 0.8 # 振幅因子
min_amplitude = 0.1 # 最小振幅(确保有轻微波动)
# 计算第一个点的y坐标
vol = self.volume_history[0]
amp = max(min_amplitude, vol) * amplitude_factor # 确保最小振幅
start_y = mid_y - amp * wave_rect.height() / 2
path.moveTo(start_x, start_y)
# 添加波形点
for i in range(1, history_len):
x = start_x + i * point_interval
# 获取当前音量
vol = self.volume_history[i]
# 计算振幅(上下波动),确保最小振幅以保持波形的可见性
amp = max(min_amplitude, vol) * amplitude_factor
# 添加正弦波动,使波形更自然
wave_phase = i / 2.0 # 波相位
sine_factor = 0.08 * amp # 正弦波因子随音量变化
sine_wave = sine_factor * np.sin(wave_phase)
y = mid_y - (amp * wave_rect.height() / 2 + sine_wave * wave_rect.height())
# 使用曲线连接点,使波形更平滑
if i > 1:
# 使用二次贝塞尔曲线,需要一个控制点
ctrl_x = start_x + (i - 0.5) * point_interval
prev_vol = self.volume_history[i-1]
prev_amp = max(min_amplitude, prev_vol) * amplitude_factor
prev_sine = sine_factor * np.sin((i-1) / 2.0)
ctrl_y = mid_y - (prev_amp * wave_rect.height() / 2 + prev_sine * wave_rect.height())
path.quadTo(ctrl_x, ctrl_y, x, y)
else:
# 第一个点直接连接
path.lineTo(x, y)
# 绘制波形路径
painter.drawPath(path)
# 添加渐变效果
# 创建从波形底部到顶部的渐变
gradient = QLinearGradient(
wave_rect.left(), wave_rect.top() + wave_rect.height(),
wave_rect.left(), wave_rect.top()
)
# 根据当前音量设置渐变颜色
gradient.setColorAt(0, QColor(self.current_color).lighter(140))
gradient.setColorAt(0.5, QColor(self.current_color))
gradient.setColorAt(1, QColor(self.current_color).darker(140))
# 保存画家状态
painter.save()
# 创建反射路径(波形的镜像)
reflect_path = QPainterPath(path)
# 将路径向下移动,创建反射效果
transform = QTransform()
transform.translate(0, wave_rect.height() / 4)
reflect_path = transform.map(reflect_path)
# 设置半透明画笔绘制反射
reflect_pen = QPen()
reflect_pen.setWidth(1)
reflect_pen.setColor(QColor(self.current_color).lighter(160))
painter.setPen(reflect_pen)
# 设置透明度
painter.setOpacity(0.3)
# 绘制反射波形
painter.drawPath(reflect_path)
# 恢复画家状态
painter.restore()
# 添加音量百分比小浮标
if self.current_volume > 0.1: # 只有当音量足够大时才显示
percent_text = f"{int(self.current_volume * 100)}%"
painter.setPen(QColor(self.current_color).darker(120))
# 字体大小随音量变化
font = painter.font()
font.setPointSize(8 + int(self.current_volume * 4)) # 8-12pt
font.setBold(True)
painter.setFont(font)
# 在波形最右侧显示百分比
right_edge = wave_rect.right() - 40
y_position = mid_y - amp * wave_rect.height() / 3 # 根据当前振幅定位
# 使用QPoint而不是单独的x,y坐标,或者将浮点数转为整数
# Windows下QPainter.drawText对坐标类型要求更严格
painter.drawText(int(right_edge), int(y_position), percent_text)