import gradio as gr import openai import sys import os import json import threading import time import requests import argparse import markdown2 from dotenv import load_dotenv from IPython.display import Image from moviepy.editor import VideoFileClip, concatenate_videoclips, ImageClip from moviepy.video.fx.all import fadein, fadeout from PIL import Image as PIL_Image from jinja2 import Template load_dotenv() openai.api_key = os.getenv('OPENAI_API_KEY') REPLICATE_API_TOKEN = os.getenv("REPLICATE_API_TOKEN") ENV = os.getenv("ENV") MODEL = "gpt-3.5-turbo" # MODEL = "gpt-4" if ENV == "PRODUCTION": import replicate else: from stub import replicate class Video: def __init__(self, scene, index): self.scene = scene self.prompt = "masterpiece, awards, best quality, dramatic-lighting, " self.prompt = self.prompt + scene.get("visual_prompt_in_en") self.prompt = self.prompt + ", cinematic-angles-" + scene.get("cinematic_angles") self.nagative_prompt = "badhandv4, easynegative, ng_deepnegative_v1_75t, verybadimagenegative_v1.3, bad-artist, bad_prompt_version2-neg, nsfw, " self.index = index self.output_url = None self.file_path = f"assets/thread_{index}_video.mp4" def run_replicate(self): start_time = time.time() self.output_url = replicate.run( "lucataco/animate-diff:1531004ee4c98894ab11f8a4ce6206099e732c1da15121987a8eef54828f0663", input={ "motion_module": "mm_sd_v14", "prompt": self.prompt, "n_prompt": self.nagative_prompt, } ) end_time = time.time() duration = end_time - start_time self.download_and_save(url=self.output_url, file_path=self.file_path) self.print_thread_info(start_time, end_time, duration) def download_and_save(self, url, file_path): response = requests.get(url) with open(file_path, "wb") as f: f.write(response.content) def print_thread_info(self, start_time, end_time, duration): print(f"Thread {self.index} output_url: {self.output_url}") print(f"Thread {self.index} start time: {start_time}") print(f"Thread {self.index} end time: {end_time}") print(f"Thread {self.index} duration: {duration}") class ThreadController: def __init__(self, args): self.args = args self.num_threads = len(args) scenes = args.get("scenes") # prompts = [] # if scenes: # for scene_data in scenes: # prompt = scene_data.get("visual_prompt_in_en") # prompt = prompt + ", " + scene_data.get("cinematic_angles") # prompt = prompt + ", " + scene_data.get("visual_prompt_in_en") # prompts.append(prompt) self.videos = [Video(scene, index) for index, scene in enumerate(scenes)] self.threads = [] def run_threads(self): os.makedirs("assets", exist_ok=True) for video in self.videos: thread = threading.Thread(target=video.run_replicate) self.threads.append(thread) thread.start() for thread in self.threads: thread.join() def merge_videos(self): clips = [] for video in self.videos: clips.append(VideoFileClip(video.file_path)) final_clip = concatenate_videoclips(clips) os.makedirs("videos", exist_ok=True) output_path = "videos/final_concatenated_video.mp4" final_clip.write_videofile(output_path, codec='libx264', fps=24) return output_path def print_prompts(self): for video in self.videos: print(f"Thread {video.index} prompt: {video.prompt}") def main(args): thread_controller = ThreadController(args) thread_controller.run_threads() merged_video_path = thread_controller.merge_videos() thread_controller.print_prompts() return merged_video_path def load_prompts(file_path): with open(file_path, "r") as f: prompts = f.read().splitlines() return prompts def get_filetext(filename): with open(filename, "r") as file: filetext = file.read() return filetext def get_functions_from_schema(filename): schema = get_filetext(filename) schema_json = json.loads(schema) functions = schema_json.get("functions") return functions functions = get_functions_from_schema('schema.json') class OpenAI: @classmethod def chat_completion_with_function(cls, prompt, messages, functions): print("prompt:"+prompt) # 文章生成にかかる時間を計測する start = time.time() # ChatCompletion APIを呼び出す response = openai.ChatCompletion.create( model=MODEL, messages=messages, functions=functions, function_call={"name": "generate_video"} ) print("gpt generation time: "+str(time.time() - start)) # ChatCompletion APIから返された結果を取得する message = response.choices[0].message print("chat completion message: " + json.dumps(message, indent=2)) return message class NajiminoAI: def __init__(self, user_message): self.user_message = user_message def generate_markdown(self, args, generation_time): # # lang=args.get("lang") # title=args.get("title") # description=args.get("description") # visual_prompt_in_en=args.get("visual_prompt_in_en") # scenes = args.get("scenes") # prompt_for_visual_expression = \ # visual_prompt_in_en # print("prompt_for_visual_expression: "+prompt_for_visual_expression) # prompts = [] # if scenes: # for scene_data in scenes: # prompt = scene_data.get("visual_prompt_in_en") # prompt = prompt + ", " + scene_data.get("cinematic_angles") # prompt = prompt + ", " + scene_data.get("visual_prompt_in_en") # prompts.append(prompt) # print("scenes: " + json.dumps(scenes, indent=2)) # if scenes: # for scene_data in scenes: # scene = scene_data.get("scene") # cinematic_angles = scene_data.get("cinematic_angles") # visual_prompt_in_en = scene_data.get("visual_prompt_in_en") # print("scene: ", scene) # print("cinematic_angles: ", cinematic_angles) # print("visual_prompt_in_en: ", visual_prompt_in_en) template_string = get_filetext(filename = "template.md") template = Template(template_string) result = template.render(args=args, generation_time=generation_time) print(result) return result @classmethod def generate(cls, user_message): najiminoai = NajiminoAI(user_message) return najiminoai.create_video() def create_video(self): main_start_time = time.time() user_message = self.user_message + " 4シーン" messages = [ {"role": "user", "content": user_message} ] functions = get_functions_from_schema('schema.json') message = OpenAI.chat_completion_with_function(prompt=user_message, messages=messages, functions=functions) video_path = None html = None if message.get("function_call") is None: print("message: " + json.dumps(message, indent=2)) return [video_path, html] function_name = message["function_call"]["name"] args = json.loads(message["function_call"]["arguments"]) print("args: " + json.dumps(args, indent=2)) # # lang=args.get("lang") # title=args.get("title") # description=args.get("description") # visual_prompt_in_en=args.get("visual_prompt_in_en") # scenes = args.get("scenes") # prompt_for_visual_expression = \ # visual_prompt_in_en # print("prompt_for_visual_expression: "+prompt_for_visual_expression) # prompts = [] # if scenes: # for scene_data in scenes: # prompt = scene_data.get("visual_prompt_in_en") # prompt = prompt + ", " + scene_data.get("cinematic_angles") # prompt = prompt + ", " + scene_data.get("visual_prompt_in_en") # prompts.append(prompt) video_path = main(args) main_end_time = time.time() main_duration = main_end_time - main_start_time print("Thread Main start time:", main_start_time) print("Thread Main end time:", main_end_time) print("Thread Main duration:", main_duration) print("All threads finished.") function_response = self.generate_markdown(args, main_duration) html = ( "
" + markdown2.markdown(function_response,extras=["tables"]) + "