import gradio as gr import openai import sys import os import json import threading import time import requests import argparse import markdown2 import uuid from pathlib import Path from dotenv import load_dotenv from IPython.display import Image from moviepy.editor import VideoFileClip, concatenate_videoclips, ImageClip from moviepy.video.fx.all import fadein, fadeout from PIL import Image as PIL_Image from jinja2 import Template ENV = os.getenv("ENV") MODEL = "gpt-3.5-turbo" # MODEL = "gpt-4" load_dotenv() openai.api_key = os.getenv('OPENAI_API_KEY') # REPLICATE_API_TOKEN = os.getenv("REPLICATE_API_TOKEN") # REPLICATE_API_TOKEN_LISTをロードし、カンマで分割してリストに変換 REPLICATE_API_TOKEN_LIST = os.getenv("REPLICATE_API_TOKEN_LIST").split(',') REPLICATE_API_TOKEN_INDEX = 0 # トークンのインデックスを初期化 NUMBER_OF_SCENES = os.getenv("NUMBER_OF_SCENES") if ENV == "PRODUCTION": import replicate else: from stub import replicate class Video: def __init__(self, scene, index, token_controller): self.token_controller = token_controller self.scene = scene self.prompt = "masterpiece, awards, best quality, dramatic-lighting, " self.prompt = self.prompt + scene.get("visual_prompt_in_en") self.prompt = self.prompt + ", cinematic-angles-" + scene.get("cinematic_angles") self.nagative_prompt = "badhandv4, easynegative, ng_deepnegative_v1_75t, verybadimagenegative_v1.3, bad-artist, bad_prompt_version2-neg, nsfw, " self.index = index self.output_url = None self.video_id = uuid.uuid4() self.file_path = f"assets/thread_{index}_request_{self.video_id}_video.mp4" MAX_RETRIES = 2 def run_replicate(self, retries=0): try: self.token = self.token_controller.get_next_token() start_time = time.time() os.environ["REPLICATE_API_TOKEN"] = self.token #tokenの最初の10文字だけ出力 print(f"Thread {self.index} token: {self.token[:10]}") self.output_url = replicate.run( "lucataco/animate-diff:1531004ee4c98894ab11f8a4ce6206099e732c1da15121987a8eef54828f0663", input={ "motion_module": "mm_sd_v14", "prompt": self.prompt, "n_prompt": self.nagative_prompt, "seed": 0, } ) end_time = time.time() duration = end_time - start_time self.download_and_save(url=self.output_url, file_path=self.file_path) self.print_thread_info(start_time, end_time, duration) except replicate.exceptions.ReplicateError as e: if str(e) == "The requested resource could not be found." and retries < self.MAX_RETRIES: print("リソースが見つからないエラーが発生しました。2秒後に再試行します。") time.sleep(2) self.run_replicate(retries + 1) # 再帰的に関数を呼び出して再試行 elif retries >= self.MAX_RETRIES: print("最大再試行回数に達しました。スレッドを終了します。") # 最大再試行回数に達した場合の追加処理 else: print("予期しないエラーが発生しました。スレッドを終了します。") # 予期しないエラーが発生した場合の追加処理 except Exception as e: print(f"Error in thread {self.index}: {e}") def download_and_save(self, url, file_path): response = requests.get(url) with open(file_path, "wb") as f: f.write(response.content) def print_thread_info(self, start_time, end_time, duration): print(f"Thread {self.index} output_url: {self.output_url}") print(f"Thread {self.index} start time: {start_time}") print(f"Thread {self.index} end time: {end_time}") print(f"Thread {self.index} duration: {duration}") class ThreadController: def __init__(self, args): self.args = args scenes = args.get("scenes") self.videos = [] self.threads = [] self.token_index = 0 self.lock = threading.Lock() for index, scene in enumerate(scenes): for _ in REPLICATE_API_TOKEN_LIST: # token = REPLICATE_API_TOKEN_LIST[self.token_index] video = Video(scene, index, self) self.videos.append(video) self.token_index = (self.token_index + 1) % len(REPLICATE_API_TOKEN_LIST) def run_threads(self): os.makedirs("assets", exist_ok=True) for video in self.videos: thread = threading.Thread(target=video.run_replicate) self.threads.append(thread) thread.start() # 1秒待ってから実行 # time.sleep(1) for thread in self.threads: thread.join() def merge_videos(self): clips = [] for video in self.videos: video_path = Path(video.file_path) if video_path.exists(): clips.append(VideoFileClip(video.file_path)) else: print(f"Error: Video file {video.file_path} could not be found! Skipping this file.") # 他のログ出力方法も使用可能、例: loggingモジュール final_clip = concatenate_videoclips(clips) os.makedirs("videos", exist_ok=True) output_path = f"videos/final_concatenated_video_{uuid.uuid4()}.mp4" final_clip.write_videofile(output_path, codec='libx264', fps=24) return output_path def print_prompts(self): for video in self.videos: print(f"Thread {video.index} prompt: {video.prompt}") def get_next_token(self): with self.lock: token = REPLICATE_API_TOKEN_LIST[self.token_index] self.token_index = (self.token_index + 1) % len(REPLICATE_API_TOKEN_LIST) return token def main(args): thread_controller = ThreadController(args) thread_controller.run_threads() merged_video_path = thread_controller.merge_videos() thread_controller.print_prompts() return merged_video_path def load_prompts(file_path): with open(file_path, "r") as f: prompts = f.read().splitlines() return prompts def get_filetext(filename): with open(filename, "r") as file: filetext = file.read() return filetext def get_functions_from_schema(filename): schema = get_filetext(filename) schema_json = json.loads(schema) functions = schema_json.get("functions") return functions functions = get_functions_from_schema('schema.json') class OpenAI: @classmethod def chat_completion_with_function(cls, prompt, messages, functions): print("prompt:"+prompt) # 文章生成にかかる時間を計測する start = time.time() # ChatCompletion APIを呼び出す response = openai.ChatCompletion.create( model=MODEL, messages=messages, functions=functions, function_call={"name": "generate_video"} ) print("gpt generation time: "+str(time.time() - start)) # ChatCompletion APIから返された結果を取得する message = response.choices[0].message print("chat completion message: " + json.dumps(message, indent=2)) return response class NajiminoAI: def __init__(self, user_message): self.user_message = user_message def generate_markdown(self, args, generation_time): # # lang=args.get("lang") # title=args.get("title") # description=args.get("description") # visual_prompt_in_en=args.get("visual_prompt_in_en") # scenes = args.get("scenes") # prompt_for_visual_expression = \ # visual_prompt_in_en # print("prompt_for_visual_expression: "+prompt_for_visual_expression) # prompts = [] # if scenes: # for scene_data in scenes: # prompt = scene_data.get("visual_prompt_in_en") # prompt = prompt + ", " + scene_data.get("cinematic_angles") # prompt = prompt + ", " + scene_data.get("visual_prompt_in_en") # prompts.append(prompt) # print("scenes: " + json.dumps(scenes, indent=2)) # if scenes: # for scene_data in scenes: # scene = scene_data.get("scene") # cinematic_angles = scene_data.get("cinematic_angles") # visual_prompt_in_en = scene_data.get("visual_prompt_in_en") # print("scene: ", scene) # print("cinematic_angles: ", cinematic_angles) # print("visual_prompt_in_en: ", visual_prompt_in_en) template_string = get_filetext(filename = "template.md") template = Template(template_string) result = template.render(args=args, generation_time=generation_time) print(result) return result @classmethod def generate(cls, user_message): najiminoai = NajiminoAI(user_message) return najiminoai.create_video() def create_video(self): main_start_time = time.time() user_message = self.user_message + f" {NUMBER_OF_SCENES}シーン" messages = [ {"role": "user", "content": user_message} ] functions = get_functions_from_schema('schema.json') response = OpenAI.chat_completion_with_function(prompt=user_message, messages=messages, functions=functions) message = response.choices[0].message total_tokens = response.usage.total_tokens video_path = None html = None if message.get("function_call") is None: print("message: " + json.dumps(message, indent=2)) return [video_path, html] function_name = message["function_call"]["name"] args = json.loads(message["function_call"]["arguments"]) print("args: " + json.dumps(args, indent=2)) # # lang=args.get("lang") # title=args.get("title") # description=args.get("description") # visual_prompt_in_en=args.get("visual_prompt_in_en") # scenes = args.get("scenes") # prompt_for_visual_expression = \ # visual_prompt_in_en # print("prompt_for_visual_expression: "+prompt_for_visual_expression) # prompts = [] # if scenes: # for scene_data in scenes: # prompt = scene_data.get("visual_prompt_in_en") # prompt = prompt + ", " + scene_data.get("cinematic_angles") # prompt = prompt + ", " + scene_data.get("visual_prompt_in_en") # prompts.append(prompt) video_path = main(args) main_end_time = time.time() main_duration = main_end_time - main_start_time print("Thread Main start time:", main_start_time) print("Thread Main end time:", main_end_time) print("Thread Main duration:", main_duration) print("All threads finished.") function_response = self.generate_markdown(args, main_duration) html = ( "
" + markdown2.markdown(function_response,extras=["tables"]) + "