|
|
|
import discord, os, io, re, asyncio, logging, requests, replicate, subprocess |
|
from transformers import pipeline as transformers_pipeline |
|
|
|
|
|
TOKEN = os.getenv("DISCORD_TOKEN") |
|
CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID")) |
|
REPL_TOKEN = (os.getenv("OPENAI_API_KEY") or "").strip() |
|
HF_TOKEN = (os.getenv("HF_TOKEN") or "").strip() |
|
|
|
if not TOKEN or not CHANNEL_ID: |
|
raise RuntimeError("DISCORD_TOKEN κ³Ό DISCORD_CHANNEL_ID νκ²½ λ³μλ₯Ό λͺ¨λ μ§μ νμΈμ.") |
|
|
|
if not REPL_TOKEN: |
|
raise RuntimeError( |
|
"OPENAI_API_KEY νκ²½ λ³μμ Replicate Personal Access Token κ°μ λ£μ΄μ£ΌμΈμ." |
|
) |
|
|
|
|
|
os.environ["REPLICATE_API_TOKEN"] = REPL_TOKEN |
|
|
|
|
|
MODEL = ( |
|
"bytedance/sdxl-lightning-4step:" |
|
"6f7a773af6fc3e8de9d5a3c00be77c17308914bf67772726aff83496ba1e3bbe" |
|
) |
|
|
|
|
|
translator_kwargs = {"device": -1} |
|
if HF_TOKEN: |
|
translator_kwargs["token"] = HF_TOKEN |
|
|
|
translator = transformers_pipeline( |
|
"translation", |
|
model="Helsinki-NLP/opus-mt-ko-en", |
|
**translator_kwargs |
|
) |
|
|
|
def ko2en(text: str) -> str: |
|
"""νκΈ ν¬ν¨ μ μμ΄ λ²μ, κ·Έλ μ§ μμΌλ©΄ μλ¬Έ λ°ν.""" |
|
if re.search(r"[κ°-ν£]", text): |
|
try: |
|
return translator(text, max_length=512)[0]["translation_text"].strip() |
|
except Exception as e: |
|
logging.warning(f"λ²μ μ€ν¨, μλ¬Έ μ¬μ©: {e}") |
|
return text |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s [%(levelname)s] %(message)s", |
|
handlers=[logging.StreamHandler()] |
|
) |
|
|
|
|
|
intents = discord.Intents.default() |
|
intents.message_content = True |
|
|
|
class ImageBot(discord.Client): |
|
async def on_ready(self): |
|
logging.info(f"Logged in as {self.user} (id={self.user.id})") |
|
|
|
try: |
|
subprocess.Popen(["python", "web.py"]) |
|
logging.info("web.py server has been started.") |
|
except Exception as e: |
|
logging.warning(f"web.py μ€ν μ€ν¨: {e}") |
|
|
|
async def on_message(self, message: discord.Message): |
|
|
|
if message.author.id == self.user.id or message.channel.id != CHANNEL_ID: |
|
return |
|
|
|
prompt = message.content.strip() |
|
if not prompt: |
|
return |
|
|
|
prompt_en = ko2en(prompt) |
|
await message.channel.typing() |
|
|
|
|
|
def run_replicate(): |
|
return list(replicate.run(MODEL, input={"prompt": prompt_en})) |
|
|
|
try: |
|
images = await asyncio.get_running_loop().run_in_executor(None, run_replicate) |
|
except Exception as e: |
|
logging.error(f"Replicate error: {e}") |
|
await message.reply("β οΈ μ΄λ―Έμ§ μμ± μ€ν¨!") |
|
return |
|
|
|
|
|
files = [] |
|
for idx, item in enumerate(images): |
|
try: |
|
data = item.read() if hasattr(item, "read") else requests.get(item).content |
|
files.append(discord.File(io.BytesIO(data), filename=f"img_{idx}.png")) |
|
except Exception as e: |
|
logging.warning(f"[IMG {idx}] μ²λ¦¬ μ€ν¨: {e}") |
|
|
|
await message.reply( |
|
files=files if files else None, |
|
content=None if files else "β οΈ μ΄λ―Έμ§λ₯Ό μ μ‘ν μ μμ΅λλ€." |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
replicate.Client(api_token=REPL_TOKEN) |
|
ImageBot(intents=intents).run(TOKEN) |
|
|