|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
import re |
|
|
|
import librosa |
|
import numpy as np |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def is_silent(data): |
|
if np.abs(data).max() < 3e-3: |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
def sentence_end(txt): |
|
for c in [".", "。", "!", "?", "!", "?"]: |
|
if c in txt: |
|
if c == ".": |
|
idx = txt.find(c) |
|
if idx > 0: |
|
if txt[idx - 1].isdigit(): |
|
continue |
|
return c |
|
return "" |
|
|
|
|
|
class NumberToTextConverter: |
|
r""" |
|
A helper class to ensure text-to-speech (TTS) systems read numeric digits |
|
in the desired language (Chinese or English) digit-by-digit. It forcibly |
|
replaces all numeric substrings in text with their language-specific |
|
textual representations, thereby reducing the likelihood of TTS mistakes |
|
on numbers. |
|
Note: MiniCPM-o 2.6 only use this in streaming mode. |
|
|
|
Attributes: |
|
num_to_chinese (dict): |
|
Mapping from digit (str) to its Chinese textual form (str). |
|
num_to_english (dict): |
|
Mapping from digit (str) to its English textual form (str). |
|
|
|
Example: |
|
>>> converter = NumberToTextConverter() |
|
>>> converter.replace_numbers_with_text("我有2个苹果", language="chinese") |
|
'我有两个苹果' |
|
>>> converter.replace_numbers_with_text("I have 23 books", language="english") |
|
'I have two three books' |
|
""" |
|
|
|
def __init__(self): |
|
self.num_to_chinese = { |
|
"0": "零", |
|
"1": "一", |
|
"2": "二", |
|
"3": "三", |
|
"4": "四", |
|
"5": "五", |
|
"6": "六", |
|
"7": "七", |
|
"8": "八", |
|
"9": "九", |
|
} |
|
self.num_to_english = { |
|
"0": "zero", |
|
"1": "one", |
|
"2": "two", |
|
"3": "three", |
|
"4": "four", |
|
"5": "five", |
|
"6": "six", |
|
"7": "seven", |
|
"8": "eight", |
|
"9": "nine", |
|
} |
|
|
|
def number_to_chinese_digit_by_digit(self, num_str): |
|
result = "" |
|
for char in num_str: |
|
if char in self.num_to_chinese: |
|
result += self.num_to_chinese[char] |
|
return result |
|
|
|
def number_to_english_digit_by_digit(self, num_str): |
|
result = [] |
|
for char in num_str: |
|
if char in self.num_to_english: |
|
result.append(self.num_to_english[char]) |
|
return " ".join(result) |
|
|
|
def detect_language(self, text): |
|
chinese_count = len(re.findall(r"[\u4e00-\u9fff]", text)) |
|
english_count = len(re.findall(r"[a-zA-Z]", text)) |
|
return "chinese" if chinese_count >= english_count else "english" |
|
|
|
def replace_numbers_with_text(self, text, language=None): |
|
if language is None: |
|
language = self.detect_language(text) |
|
numbers = re.findall(r"\d+", text) |
|
|
|
for num in numbers: |
|
if language == "chinese": |
|
replacement = self.number_to_chinese_digit_by_digit(num) |
|
else: |
|
replacement = self.number_to_english_digit_by_digit(num) |
|
text = text.replace(num, replacement, 1) |
|
|
|
return text |
|
|
|
|
|
class VoiceChecker: |
|
r""" |
|
A simple utility class to detect silence or low variation in consecutive audio chunks by comparing |
|
the mel-spectrogram distances. It keeps track of consecutive zero-distance and low-distance chunks |
|
to decide if the audio is considered "bad" (e.g., overly silent or not changing enough). |
|
|
|
Attributes: |
|
previous_mel (`np.ndarray` or `None`): |
|
Holds the previously observed mel-spectrogram in decibel scale. Used to compute |
|
the next distance; reset via :meth:`reset`. |
|
consecutive_zeros (`int`): |
|
The number of consecutive chunks that were detected as silent (distance = 0). |
|
consecutive_low_distance (`int`): |
|
The number of consecutive chunks whose distance was below the threshold. |
|
|
|
Example: |
|
>>> checker = VoiceChecker() |
|
>>> # Suppose we have audio_wav (list or np.ndarray) and mel_spec (np.ndarray) |
|
>>> # We split them into chunks and call checker.is_bad(...) |
|
>>> is_audio_bad = checker.is_bad(audio_wav, mel_spec, chunk_size=2560, thresh=100.0) |
|
>>> if is_audio_bad: |
|
... print("Audio deemed bad!") |
|
>>> # Reset states if needed |
|
>>> checker.reset() |
|
""" |
|
|
|
def __init__(self): |
|
self.previous_mel = None |
|
self.consecutive_zeros = 0 |
|
self.consecutive_low_distance = 0 |
|
|
|
def compute_distance(self, audio_chunk, mel_spec): |
|
if is_silent(audio_chunk): |
|
return 0.0 |
|
|
|
mel_db = librosa.power_to_db(mel_spec) |
|
if self.previous_mel is None: |
|
self.previous_mel = mel_db |
|
return -1.0 |
|
|
|
distance = np.linalg.norm(np.mean(mel_db, axis=1) - np.mean(self.previous_mel, axis=1)) |
|
self.previous_mel = mel_db |
|
return distance |
|
|
|
def is_bad(self, audio_wav, mel_spec, chunk_size=2560, thresh=100.0): |
|
num_chunks = len(audio_wav) // chunk_size |
|
mel_chunk_size = mel_spec.shape[-1] // num_chunks |
|
for i in range(num_chunks): |
|
audio_chunk = audio_wav[i * chunk_size : (i + 1) * chunk_size] |
|
mel_spec_chunk = mel_spec[:, i * mel_chunk_size : (i + 1) * mel_chunk_size] |
|
|
|
distance = self.compute_distance(audio_chunk, mel_spec_chunk) |
|
logger.warning( |
|
f"mel dist: {distance:.1f}, zero: {self.consecutive_zeros}, low: {self.consecutive_low_distance}" |
|
) |
|
if distance == 0: |
|
self.consecutive_low_distance = 0 |
|
self.consecutive_zeros += 1 |
|
if self.consecutive_zeros >= 12: |
|
logger.warning("VoiceChecker detected 1.2 s silent. Marking as failed.") |
|
return True |
|
elif distance < thresh: |
|
self.consecutive_zeros = 0 |
|
self.consecutive_low_distance += 1 |
|
if self.consecutive_low_distance >= 5: |
|
logger.warning("VoiceChecker detected 5 consecutive low distance chunks. Marking as failed.") |
|
return True |
|
else: |
|
self.consecutive_low_distance = 0 |
|
self.consecutive_zeros = 0 |
|
|
|
return False |
|
|
|
def reset(self): |
|
self.previous_mel = None |
|
self.consecutive_zeros = 0 |
|
self.consecutive_low_distance = 0 |
|
|