xiaozhi / src /audio_codecs /audio_codec.py
nzjsdsk's picture
Upload 169 files
27e74f3 verified
import queue
import numpy as np
import pyaudio
import opuslib
import time
import threading
from src.constants.constants import AudioConfig
from src.utils.logging_config import get_logger
logger = get_logger(__name__)
class AudioCodec:
"""音频编解码器类,处理音频的录制和播放(严格兼容版)"""
def __init__(self):
self.audio = None
self.input_stream = None
self.output_stream = None
self.opus_encoder = None
self.opus_decoder = None
self.audio_decode_queue = queue.Queue()
# 状态管理(保留原始变量名)
self._is_closing = False
self._is_input_paused = False
self._input_paused_lock = threading.Lock()
self._stream_lock = threading.Lock()
# 新增设备索引缓存
self._cached_input_device = -1
self._cached_output_device = -1
self._initialize_audio()
def _initialize_audio(self):
try:
self.audio = pyaudio.PyAudio()
# 缓存设备索引
self._cached_input_device = self._get_default_or_first_available_device(True)
self._cached_output_device = self._get_default_or_first_available_device(False)
# 初始化流(优化实现)
self.input_stream = self._create_stream(is_input=True)
self.output_stream = self._create_stream(is_input=False)
# 编解码器初始化(保持原始参数)
self.opus_encoder = opuslib.Encoder(
AudioConfig.INPUT_SAMPLE_RATE,
AudioConfig.CHANNELS,
AudioConfig.OPUS_APPLICATION
)
self.opus_decoder = opuslib.Decoder(
AudioConfig.OUTPUT_SAMPLE_RATE,
AudioConfig.CHANNELS
)
logger.info("音频设备和编解码器初始化成功")
except Exception as e:
logger.error(f"初始化音频设备失败: {e}")
self.close()
raise
def _get_default_or_first_available_device(self, is_input=True):
"""设备选择逻辑(优化异常处理)"""
try:
device = self.audio.get_default_input_device_info() if is_input else \
self.audio.get_default_output_device_info()
logger.info(f"使用默认设备: {device['name']} (Index: {device['index']})")
return device["index"]
except OSError:
logger.warning("默认设备不可用,查找替代设备...")
for i in range(self.audio.get_device_count()):
dev = self.audio.get_device_info_by_index(i)
if is_input and dev["maxInputChannels"] > 0:
logger.info(f"使用替代输入设备: {dev['name']} (Index: {i})")
return i
if not is_input and dev["maxOutputChannels"] > 0:
logger.info(f"使用替代输出设备: {dev['name']} (Index: {i})")
return i
raise RuntimeError("没有可用的音频设备")
def _create_stream(self, is_input=True):
"""流创建逻辑(新增设备缓存)"""
params = {
"format": pyaudio.paInt16,
"channels": AudioConfig.CHANNELS,
"rate": AudioConfig.INPUT_SAMPLE_RATE if is_input else AudioConfig.OUTPUT_SAMPLE_RATE,
"input" if is_input else "output": True,
"frames_per_buffer": AudioConfig.INPUT_FRAME_SIZE if is_input else AudioConfig.OUTPUT_FRAME_SIZE,
"start": False
}
# 使用缓存设备索引
if is_input:
params["input_device_index"] = self._cached_input_device
else:
params["output_device_index"] = self._cached_output_device
return self.audio.open(**params)
def _reinitialize_input_stream(self):
"""输入流重建(优化设备缓存)"""
if self._is_closing:
return
try:
# 刷新设备缓存
self._cached_input_device = self._get_default_or_first_available_device(True)
if self.input_stream:
try:
self.input_stream.stop_stream()
self.input_stream.close()
except Exception:
pass
self.input_stream = self._create_stream(is_input=True)
self.input_stream.start_stream()
logger.info("音频输入流重新初始化成功")
except Exception as e:
logger.error(f"输入流重建失败: {e}")
raise
def _reinitialize_output_stream(self):
"""输出流重建(优化设备缓存)"""
if self._is_closing:
return
try:
# 刷新设备缓存
self._cached_output_device = self._get_default_or_first_available_device(False)
if self.output_stream:
try:
self.output_stream.stop_stream()
self.output_stream.close()
except Exception:
pass
self.output_stream = self._create_stream(is_input=False)
self.output_stream.start_stream()
logger.info("音频输出流重新初始化成功")
except Exception as e:
logger.error(f"输出流重建失败: {e}")
raise
def pause_input(self):
with self._input_paused_lock:
self._is_input_paused = True
logger.info("音频输入已暂停")
def resume_input(self):
with self._input_paused_lock:
self._is_input_paused = False
logger.info("音频输入已恢复")
def is_input_paused(self):
with self._input_paused_lock:
return self._is_input_paused
def read_audio(self):
"""(优化缓冲区管理)"""
if self.is_input_paused():
return None
try:
with self._stream_lock:
# 流状态检查优化
if not self.input_stream or not self.input_stream.is_active():
self._reinitialize_input_stream()
if not self.input_stream.is_active():
return None
# 动态缓冲区调整 - 实时性能优化
available = self.input_stream.get_read_available()
if available > AudioConfig.INPUT_FRAME_SIZE * 2: # 降低阈值从3倍到2倍
skip_samples = available - (AudioConfig.INPUT_FRAME_SIZE * 1.5) # 减少保留量
if skip_samples > 0: # 增加安全检查
self.input_stream.read(
int(skip_samples), # 确保整数
exception_on_overflow=False
)
logger.debug(f"跳过{skip_samples}个样本减少延迟")
# 读取数据
data = self.input_stream.read(
AudioConfig.INPUT_FRAME_SIZE,
exception_on_overflow=False
)
# 数据验证
if len(data) != AudioConfig.INPUT_FRAME_SIZE * 2:
logger.warning("音频数据长度异常,重置输入流")
self._reinitialize_input_stream()
return None
return self.opus_encoder.encode(data, AudioConfig.INPUT_FRAME_SIZE)
except Exception as e:
logger.error(f"音频读取失败: {e}")
self._reinitialize_input_stream()
return None
def play_audio(self):
"""(优化批量处理)"""
try:
if self.audio_decode_queue.empty():
return
# 批量解码优化
batch_size = min(10, self.audio_decode_queue.qsize())
buffer = bytearray()
for _ in range(batch_size):
try:
opus_data = self.audio_decode_queue.get_nowait()
pcm = self.opus_decoder.decode(opus_data, AudioConfig.OUTPUT_FRAME_SIZE)
buffer.extend(pcm)
except queue.Empty:
break
except opuslib.OpusError as e:
logger.error(f"解码失败: {e}")
if buffer:
# 优化写入流程
with self._stream_lock:
if self.output_stream and self.output_stream.is_active():
try:
self.output_stream.write(np.frombuffer(buffer, dtype=np.int16).tobytes())
except OSError as e:
if "Stream closed" in str(e):
self._reinitialize_output_stream()
self.output_stream.write(buffer)
except Exception as e:
logger.error(f"播放失败: {e}")
self._reinitialize_output_stream()
def close(self):
"""(优化资源释放顺序和线程安全性)"""
if self._is_closing:
return
self._is_closing = True
logger.info("开始关闭音频编解码器...")
try:
# 清空队列先行处理
self.clear_audio_queue()
# 安全停止和关闭流
with self._stream_lock:
# 先关闭输入流
if self.input_stream:
try:
if hasattr(self.input_stream, 'is_active') and self.input_stream.is_active():
self.input_stream.stop_stream()
self.input_stream.close()
except Exception as e:
logger.warning(f"关闭输入流失败: {e}")
finally:
self.input_stream = None
# 再关闭输出流
if self.output_stream:
try:
if hasattr(self.output_stream, 'is_active') and self.output_stream.is_active():
self.output_stream.stop_stream()
self.output_stream.close()
except Exception as e:
logger.warning(f"关闭输出流失败: {e}")
finally:
self.output_stream = None
# 最后释放PyAudio
if self.audio:
try:
self.audio.terminate()
except Exception as e:
logger.warning(f"释放PyAudio失败: {e}")
finally:
self.audio = None
# 清理编解码器
self.opus_encoder = None
self.opus_decoder = None
logger.info("音频资源已完全释放")
except Exception as e:
logger.error(f"关闭音频编解码器过程中发生错误: {e}")
finally:
self._is_closing = False
def write_audio(self, opus_data):
self.audio_decode_queue.put(opus_data)
def has_pending_audio(self):
return not self.audio_decode_queue.empty()
def wait_for_audio_complete(self, timeout=5.0):
start = time.time()
while self.has_pending_audio() and time.time() - start < timeout:
time.sleep(0.1)
def clear_audio_queue(self):
with self._stream_lock:
while not self.audio_decode_queue.empty():
try:
self.audio_decode_queue.get_nowait()
except queue.Empty:
break
def start_streams(self):
for stream in [self.input_stream, self.output_stream]:
if stream and not stream.is_active():
try:
stream.start_stream()
except OSError as e:
logger.error(f"启动失败: {e}")
def stop_streams(self):
"""安全停止流(优化错误处理)"""
with self._stream_lock:
for name, stream in [("输入", self.input_stream), ("输出", self.output_stream)]:
if stream:
try:
# 使用hasattr避免在流已关闭情况下调用is_active
if hasattr(stream, 'is_active') and stream.is_active():
stream.stop_stream()
except Exception as e:
# 使用warning级别,因为这不是严重错误
logger.warning(f"停止{name}流失败: {e}")
def __del__(self):
self.close()