|
import sys |
|
import copy |
|
import librosa |
|
import logging |
|
import argparse |
|
import numpy as np |
|
import soundfile as sf |
|
import moviepy.editor as mpy |
|
from modelscope.pipelines import pipeline |
|
from modelscope.utils.constant import Tasks |
|
from subtitle_utils import generate_srt, generate_srt_clip |
|
from trans_utils import pre_proc, proc, write_state, load_state |
|
|
|
from moviepy.editor import * |
|
from moviepy.video.tools.subtitles import SubtitlesClip |
|
|
|
|
|
class VideoClipper(): |
|
def __init__(self, asr_pipeline): |
|
logging.warning("Initializing VideoClipper.") |
|
self.asr_pipeline = asr_pipeline |
|
|
|
def recog(self, audio_input, state=None): |
|
if state is None: |
|
state = {} |
|
state['audio_input'] = audio_input |
|
_, data = audio_input |
|
data = data.astype(np.float64) |
|
rec_result = self.asr_pipeline(audio_in=data) |
|
state['recog_res_raw'] = rec_result['text_postprocessed'] |
|
state['timestamp'] = rec_result['time_stamp'] |
|
state['sentences'] = rec_result['sentences'] |
|
res_text = rec_result['text'] |
|
res_srt = generate_srt(rec_result['sentences']) |
|
return res_text, res_srt, state |
|
|
|
def clip(self, dest_text, start_ost, end_ost, state): |
|
|
|
audio_input = state['audio_input'] |
|
recog_res_raw = state['recog_res_raw'] |
|
timestamp = state['timestamp'] |
|
sentences = state['sentences'] |
|
sr, data = audio_input |
|
data = data.astype(np.float64) |
|
|
|
all_ts = [] |
|
for _dest_text in dest_text.split('#'): |
|
_dest_text = pre_proc(_dest_text) |
|
ts = proc(recog_res_raw, timestamp, _dest_text) |
|
for _ts in ts: all_ts.append(_ts) |
|
ts = all_ts |
|
srt_index = 0 |
|
clip_srt = "" |
|
if len(ts): |
|
start, end = ts[0] |
|
start = min(max(0, start+start_ost*16), len(data)) |
|
end = min(max(0, end+end_ost*16), len(data)) |
|
res_audio = data[start:end] |
|
start_end_info = "from {} to {}".format(start/16000, end/16000) |
|
srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index) |
|
clip_srt += srt_clip |
|
for _ts in ts[1:]: |
|
start, end = _ts |
|
start = min(max(0, start+start_ost*16), len(data)) |
|
end = min(max(0, end+end_ost*16), len(data)) |
|
start_end_info += ", from {} to {}".format(start, end) |
|
res_audio = np.concatenate([res_audio, data[start+start_ost*16:end+end_ost*16]], -1) |
|
srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index-1) |
|
clip_srt += srt_clip |
|
if len(ts): |
|
message = "{} periods found in the speech: ".format(len(ts)) + start_end_info |
|
else: |
|
message = "No period found in the speech, return raw speech. You may check the recognition result and try other destination text." |
|
return (sr, res_audio), message, clip_srt |
|
|
|
def video_recog(self, vedio_filename): |
|
vedio_filename = vedio_filename |
|
clip_video_file = vedio_filename[:-4] + '_clip.mp4' |
|
video = mpy.VideoFileClip(vedio_filename) |
|
audio_file = vedio_filename[:-3] + 'wav' |
|
video.audio.write_audiofile(audio_file) |
|
wav = librosa.load(audio_file, 16000)[0] |
|
state = { |
|
'vedio_filename': vedio_filename, |
|
'clip_video_file': clip_video_file, |
|
'video': video, |
|
} |
|
|
|
return self.recog((16000, wav), state) |
|
|
|
def video_clip(self, dest_text, start_ost, end_ost, state, font_size=32, font_color='white', add_sub=False): |
|
|
|
recog_res_raw = state['recog_res_raw'] |
|
timestamp = state['timestamp'] |
|
sentences = state['sentences'] |
|
video = state['video'] |
|
clip_video_file = state['clip_video_file'] |
|
vedio_filename = state['vedio_filename'] |
|
|
|
all_ts = [] |
|
srt_index = 0 |
|
for _dest_text in dest_text.split('#'): |
|
_dest_text = pre_proc(_dest_text) |
|
ts = proc(recog_res_raw, timestamp, _dest_text) |
|
for _ts in ts: all_ts.append(_ts) |
|
ts = all_ts |
|
clip_srt = "" |
|
if len(ts): |
|
start, end = ts[0][0] / 16000, ts[0][1] / 16000 |
|
start, end = start+start_ost/1000.0, end+end_ost/1000.0 |
|
video_clip = video.subclip(start, end) |
|
clip_video_file = clip_video_file |
|
start_end_info = "from {} to {}".format(start, end) |
|
|
|
srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index) |
|
clip_srt += srt_clip |
|
if add_sub: |
|
generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color) |
|
subtitles = SubtitlesClip(subs, generator) |
|
video_clip = CompositeVideoClip([video_clip, subtitles.set_pos(('center','bottom'))]) |
|
concate_clip = [video_clip] |
|
for _ts in ts[1:]: |
|
start, end = _ts[0] / 16000, _ts[1] / 16000 |
|
start, end = start+start_ost/1000.0, end+end_ost/1000.0 |
|
_video_clip = video.subclip(start, end) |
|
clip_video_file = clip_video_file |
|
start_end_info += ", from {} to {}".format(start, end) |
|
srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index-1) |
|
clip_srt += srt_clip |
|
if add_sub: |
|
generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color) |
|
subtitles = SubtitlesClip(subs, generator) |
|
_video_clip = CompositeVideoClip([_video_clip, subtitles.set_pos(('center','bottom'))]) |
|
concate_clip.append(copy.copy(_video_clip)) |
|
message = "{} periods found in the audio: ".format(len(ts)) + start_end_info |
|
logging.warning("Concating...") |
|
if len(concate_clip) > 1: |
|
video_clip = concatenate_videoclips(concate_clip) |
|
video_clip.write_videofile(clip_video_file) |
|
else: |
|
clip_video_file = vedio_filename |
|
message = "No period found in the audio, return raw speech. You may check the recognition result and try other destination text." |
|
srt_clip = '' |
|
return clip_video_file, message, clip_srt |
|
|
|
|
|
|