Spaces:
Running
Running
""" | |
Professional WhatsApp Bot using Green-API | |
Author: Assistant | |
Description: A comprehensive WhatsApp bot with a professional, class-based structure. | |
Features include image generation, image editing, voice replies, | |
and various utility functions, all handled by an asynchronous task queue. | |
Includes request logging and chat ID filtering. | |
Version 2.3.0: Fixed image generation ID mismatch issue. | |
""" | |
import os | |
import threading | |
import requests | |
import logging | |
import queue | |
import json | |
import base64 | |
from typing import List, Optional, Union, Literal, Dict, Any, Tuple, Set | |
from collections import defaultdict, deque | |
from concurrent.futures import ThreadPoolExecutor | |
from fastapi import FastAPI, Request, HTTPException | |
from fastapi.responses import JSONResponse | |
from pydantic import BaseModel, Field, ValidationError | |
import uvicorn | |
# Assume these are your custom libraries for AI functionalities | |
from FLUX import generate_image | |
from VoiceReply import generate_voice_reply | |
from polLLM import generate_llm, LLMBadRequestError | |
import gemini_flash_lib | |
# --- Configuration --------------------------------------------------------- | |
class BotConfig: | |
"""Manages all bot configuration from environment variables.""" | |
GREEN_API_URL: str | |
GREEN_API_TOKEN: str | |
GREEN_API_ID_INSTANCE: str | |
WEBHOOK_AUTH_TOKEN: str | |
IMAGE_DIR: str = "/tmp/whatsapp_images" | |
AUDIO_DIR: str = "/tmp/whatsapp_audio" | |
TEMP_DIR: str = "/tmp/whatsapp_edit" | |
DEFAULT_IMAGE_COUNT: int = 4 | |
MAX_HISTORY_SIZE: int = 20 | |
WORKER_THREADS: int = 4 | |
# Set log level to DEBUG to capture all incoming requests | |
LOG_LEVEL: str = "DEBUG" | |
# Whitelisted chat IDs. The bot will only respond to these chats. | |
ALLOWED_CHATS: Set[str] = {"[email protected]", "[email protected]"} | |
def __init__(self): | |
"""Initializes configuration from environment variables.""" | |
self.GREEN_API_URL = os.getenv("GREEN_API_URL") | |
self.GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
self.GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
self.WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
# Allow overriding log level from environment | |
self.LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper() | |
self._validate() | |
def _validate(self): | |
"""Ensures all required environment variables are set.""" | |
missing = [ | |
var for var in ("GREEN_API_URL", "GREEN_API_TOKEN", | |
"GREEN_API_ID_INSTANCE", "WEBHOOK_AUTH_TOKEN") | |
if not getattr(self, var) | |
] | |
if missing: | |
raise ValueError(f"Missing required environment variables: {', '.join(missing)}") | |
# --- Logging Setup --------------------------------------------------------- | |
class LoggerSetup: | |
"""Sets up and manages structured logging for the application.""" | |
def setup(level: str) -> logging.Logger: | |
""" | |
Configures the root logger for the application. | |
Args: | |
level: The logging level (e.g., "DEBUG", "INFO"). | |
Returns: | |
A configured logger instance. | |
""" | |
logger = logging.getLogger("whatsapp_bot") | |
logger.setLevel(level) | |
# Avoid adding duplicate handlers | |
if logger.hasHandlers(): | |
logger.handlers.clear() | |
handler = logging.StreamHandler() | |
# Added a more detailed formatter | |
formatter = logging.Formatter( | |
"%(asctime)s - %(name)s - [%(levelname)s] - [%(chat_id)s] - %(funcName)s:%(lineno)d - %(message)s" | |
) | |
handler.setFormatter(formatter) | |
class ContextFilter(logging.Filter): | |
"""Injects contextual information into log records.""" | |
def filter(self, record): | |
record.chat_id = ThreadContext.get_context().get("chat_id", "NO_CONTEXT") | |
return True | |
handler.addFilter(ContextFilter()) | |
logger.addHandler(handler) | |
return logger | |
# --- Thread Context Management --------------------------------------------- | |
class ThreadContext: | |
"""Manages thread-local context for chat and message IDs.""" | |
_context = threading.local() | |
def set_context(cls, chat_id: str, message_id: str): | |
"""Sets the context for the current thread.""" | |
cls._context.chat_id = chat_id | |
cls._context.message_id = message_id | |
def get_context(cls) -> Dict[str, Optional[str]]: | |
"""Retrieves the context for the current thread.""" | |
return { | |
"chat_id": getattr(cls._context, "chat_id", None), | |
"message_id": getattr(cls._context, "message_id", None), | |
} | |
# --- Conversation History ------------------------------------------------- | |
class ConversationManager: | |
"""Manages conversation history for each chat.""" | |
def __init__(self, max_size: int): | |
self.history = defaultdict(lambda: deque(maxlen=max_size)) | |
def add_user_message(self, chat_id: str, message: str): | |
self.history[chat_id].append(f"User: {message}") | |
def add_bot_message(self, chat_id: str, message: str): | |
self.history[chat_id].append(f"Assistant: {message}") | |
def get_history_text(self, chat_id: str) -> str: | |
return "\n".join(self.history[chat_id]) | |
def clear_history(self, chat_id: str): | |
self.history[chat_id].clear() | |
# --- Green-API Client ----------------------------------------------------- | |
class GreenApiClient: | |
"""A client for interacting with the Green-API for WhatsApp.""" | |
def __init__(self, config: BotConfig, logger: logging.Logger): | |
self.config = config | |
self.logger = logger | |
self.session = requests.Session() | |
self.base_url = ( | |
f"{self.config.GREEN_API_URL}/waInstance" | |
f"{self.config.GREEN_API_ID_INSTANCE}" | |
) | |
def _request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]: | |
"""Makes a request to the Green-API with retries.""" | |
url = f"{self.base_url}/{endpoint}/{self.config.GREEN_API_TOKEN}" | |
for attempt in range(3): | |
try: | |
self.logger.debug(f"Sending API request to {url} with payload: {kwargs.get('json', kwargs.get('data'))}") | |
response = self.session.request(method, url, timeout=20, **kwargs) | |
response.raise_for_status() | |
return response.json() | |
except requests.RequestException as e: | |
self.logger.warning( | |
f"API request to {endpoint} failed (attempt {attempt + 1}): {e}" | |
) | |
self.logger.error(f"API request to {endpoint} failed after all retries.") | |
return None | |
def send_message(self, chat_id: str, text: str, quoted_message_id: str = None): | |
payload = {"chatId": chat_id, "message": text} | |
if quoted_message_id: | |
payload["quotedMessageId"] = quoted_message_id | |
return self._request("POST", "sendMessage", json=payload) | |
def send_file(self, chat_id: str, file_path: str, caption: str = "", quoted_message_id: str = None): | |
"""Uploads and sends a file (image or audio).""" | |
filename = os.path.basename(file_path) | |
payload = {"chatId": chat_id, "caption": caption} | |
if quoted_message_id: | |
payload["quotedMessageId"] = quoted_message_id | |
with open(file_path, "rb") as f: | |
files = {"file": (filename, f)} | |
return self._request("POST", "sendFileByUpload", data=payload, files=files) | |
def download_file(self, url: str) -> Optional[bytes]: | |
"""Downloads a file from a given URL.""" | |
try: | |
response = self.session.get(url, timeout=30) | |
response.raise_for_status() | |
return response.content | |
except requests.RequestException as e: | |
self.logger.error(f"Failed to download file from {url}: {e}") | |
return None | |
# --- Pydantic Models for Intent Recognition -------------------------------- | |
class BaseIntent(BaseModel): | |
action: str | |
class SummarizeIntent(BaseIntent): action: Literal["summarize"]; text: str | |
class TranslateIntent(BaseIntent): action: Literal["translate"]; lang: str; text: str | |
class JokeIntent(BaseIntent): action: Literal["joke"] | |
class WeatherIntent(BaseIntent): action: Literal["weather"]; location: str | |
class InspireIntent(BaseIntent): action: Literal["inspire"] | |
class MemeIntent(BaseIntent): action: Literal["meme"]; text: str | |
class EditImageIntent(BaseIntent): action: Literal["edit_image"]; prompt: str | |
class GenerateImageIntent(BaseModel): | |
action: Literal["generate_image"] | |
prompt: str | |
count: int = Field(default=1, ge=1, le=10) | |
width: Optional[int] = Field(default=None, ge=512, le=2048) | |
height: Optional[int] = Field(default=None, ge=512, le=2048) | |
class SendTextIntent(BaseIntent): | |
action: Literal["send_text"] | |
message: str | |
# --- Intent Router -------------------------------------------------------- | |
class IntentRouter: | |
"""Recognizes user intent using an LLM and routes to appropriate actions.""" | |
INTENT_MODELS = [ | |
SummarizeIntent, TranslateIntent, JokeIntent, WeatherIntent, | |
InspireIntent, MemeIntent, GenerateImageIntent, EditImageIntent, SendTextIntent | |
] | |
def __init__(self, conv_manager: ConversationManager, logger: logging.Logger): | |
self.conv_manager = conv_manager | |
self.logger = logger | |
def get_intent(self, user_input: str, chat_id: str) -> BaseIntent: | |
history_text = self.conv_manager.get_history_text(chat_id) | |
system_prompt = self._build_system_prompt(history_text, user_input) | |
try: | |
raw_response = generate_llm(system_prompt) | |
except LLMBadRequestError: | |
self.logger.warning(f"LLM request failed due to bad request for chat {chat_id}. Clearing history.") | |
self.conv_manager.clear_history(chat_id) | |
return SendTextIntent(action="send_text", message="Oops! Let's start fresh! π") | |
return self._parse_response(raw_response) | |
def _build_system_prompt(self, history: str, user_input: str) -> str: | |
return ( | |
"You are a function dispatcher. You only invoke functions by returning a single JSON object.\n" | |
"Available functions:\n" | |
"- summarize(text): Summarize given text\n" | |
"- translate(lang, text): Translate text to a language\n" | |
"- joke(): Tell a random joke\n" | |
"- weather(location): Get weather for a location\n" | |
"- inspire(): Get an inspirational quote\n" | |
"- meme(text): Generate a meme from text\n" | |
"- generate_image(prompt, count, width, height): Generate images\n" | |
"- edit_image(prompt): Edit an image (requires replying to an image)\n" | |
"- send_text(message): Send a plain text response\n\n" | |
"Return only raw JSON. Examples:\n" | |
'{"action":"generate_image","prompt":"a red fox","count":2}\n' | |
'{"action":"edit_image","prompt":"make the sky purple"}\n' | |
'{"action":"send_text","message":"Hello there!"}\n\n' | |
f"Conversation history:\n{history}\n\n" | |
f"Current message: User: {user_input}" | |
) | |
def _parse_response(self, raw_response: str) -> BaseIntent: | |
try: | |
# Clean the response to ensure it's valid JSON | |
cleaned_response = raw_response.strip().replace("`json", "").replace("`", "") | |
parsed = json.loads(cleaned_response) | |
for model in self.INTENT_MODELS: | |
try: | |
return model.model_validate(parsed) | |
except ValidationError: | |
continue | |
except json.JSONDecodeError: | |
self.logger.warning(f"Could not decode LLM response to JSON: {raw_response}") | |
# Fallback for non-JSON or unparsable responses | |
return SendTextIntent(action="send_text", message=raw_response) | |
# --- Main Application Class ------------------------------------------------ | |
class WhatsAppBot: | |
def __init__(self, config: BotConfig): | |
self.config = config | |
self.logger = LoggerSetup.setup(config.LOG_LEVEL) | |
self.api_client = GreenApiClient(config, self.logger) | |
self.conv_manager = ConversationManager(config.MAX_HISTORY_SIZE) | |
self.intent_router = IntentRouter(self.conv_manager, self.logger) | |
self.task_queue = queue.Queue() | |
# Cache for user-sent image message IDs and their download URLs | |
self.image_cache = defaultdict(lambda: deque(maxlen=50)) | |
# NEW: Cache for bot-sent image message IDs and their local file paths | |
self.sent_image_path_cache = defaultdict(lambda: deque(maxlen=50)) | |
self.fastapi_app = FastAPI(title="WhatsApp Eve Bot", version="2.3.0") | |
self._setup_routes() | |
self._start_workers() | |
def _setup_routes(self): | |
async def webhook(request: Request): | |
if request.headers.get("Authorization") != f"Bearer {self.config.WEBHOOK_AUTH_TOKEN}": | |
self.logger.warning("Unauthorized webhook access attempt.") | |
raise HTTPException(403, "Unauthorized") | |
try: | |
payload = await request.json() | |
self.logger.debug(f"Incoming webhook payload: {json.dumps(payload, indent=2)}") | |
if payload.get("typeWebhook") == "incomingMessageReceived": | |
executor.submit(self._process_incoming_message, payload) | |
else: | |
self.logger.info(f"Received non-message webhook type: {payload.get('typeWebhook')}") | |
except json.JSONDecodeError: | |
self.logger.error("Failed to decode JSON from webhook request.") | |
raise HTTPException(400, "Invalid JSON payload.") | |
return JSONResponse(content={"status": "received"}) | |
def health_check(): | |
return JSONResponse(content={"status": "healthy"}) | |
def _start_workers(self): | |
for i in range(self.config.WORKER_THREADS): | |
threading.Thread(target=self._worker, name=f"Worker-{i}", daemon=True).start() | |
self.logger.info(f"Started {self.config.WORKER_THREADS} worker threads.") | |
def _worker(self): | |
"""Worker thread to process tasks from the queue.""" | |
while True: | |
task = self.task_queue.get() | |
try: | |
ThreadContext.set_context(task["chat_id"], task["message_id"]) | |
handler = getattr(self, f"_task_{task['type']}", None) | |
if handler: | |
self.logger.debug(f"Processing task: {task['type']} for chat {task['chat_id']}") | |
handler(task) | |
else: | |
self.logger.warning(f"Unknown task type received: {task['type']}") | |
except Exception as e: | |
self.logger.error(f"Error processing task {task.get('type', 'N/A')}: {e}", exc_info=True) | |
finally: | |
self.task_queue.task_done() | |
def _process_incoming_message(self, payload: Dict[str, Any]): | |
"""Main logic for handling an incoming message payload.""" | |
try: | |
chat_id = payload["senderData"]["chatId"] | |
message_id = payload["idMessage"] | |
if chat_id not in self.config.ALLOWED_CHATS: | |
self.logger.warning(f"Ignoring message from unauthorized chat ID: {chat_id}") | |
return | |
ThreadContext.set_context(chat_id, message_id) | |
message_data = payload.get("messageData", {}) | |
type_message = message_data.get("typeMessage") | |
text, download_url, is_image_file = "", None, False | |
if type_message in ["imageMessage", "documentMessage"]: | |
file_data = message_data.get("fileMessageData", {}) | |
mime_type = file_data.get("mimeType", "") | |
if type_message == "imageMessage" or mime_type.startswith("image/"): | |
is_image_file = True | |
download_url = file_data.get("downloadUrl") | |
text = file_data.get("caption", "").strip() | |
if download_url: | |
self.logger.debug(f"Caching incoming image message {message_id} from {chat_id}") | |
self.image_cache[chat_id].append((message_id, download_url)) | |
elif type_message == "quotedMessage": | |
text = message_data.get("extendedTextMessageData", {}).get("text", "").strip() | |
elif type_message == "textMessage": | |
text = message_data.get("textMessageData", {}).get("textMessage", "").strip() | |
elif type_message == "extendedTextMessage": | |
text = message_data.get("extendedTextMessageData", {}).get("text", "").strip() | |
if not text: | |
self.logger.debug(f"Received message type '{type_message}' without actionable text from {chat_id}.") | |
return | |
self.logger.info(f"Processing text from {chat_id}: '{text}'") | |
self.conv_manager.add_user_message(chat_id, text) | |
if is_image_file and text: | |
intent = self.intent_router.get_intent(text, chat_id) | |
if isinstance(intent, EditImageIntent): | |
self.logger.info("Detected direct edit command in image caption.") | |
self.task_queue.put({ | |
"type": "edit_image", "chat_id": chat_id, "message_id": message_id, | |
"prompt": intent.prompt, "input_source": download_url, "source_type": 'url', | |
}) | |
return | |
if text.startswith('/'): | |
self._handle_command(chat_id, message_id, text, payload) | |
else: | |
self._handle_natural_language(chat_id, message_id, text, payload) | |
except KeyError as e: | |
self.logger.error(f"Missing expected key in message payload: {e}. Payload: {payload}") | |
except Exception as e: | |
self.logger.error(f"Failed to process message payload: {e}", exc_info=True) | |
def _handle_command(self, chat_id, message_id, text, payload): | |
"""Processes direct slash commands.""" | |
parts = text.lower().split() | |
command = parts[0] | |
args = text.split(maxsplit=1)[1] if len(parts) > 1 else "" | |
if command == "/help": | |
help_text = ( | |
"*π€ Eve's Command Center:*\n\n" | |
"πΉ `/help` - Show this help message\n" | |
"πΉ `/gen <prompt>` - Generate an image\n" | |
"πΉ `/edit <prompt>` - Reply to an image to edit it\n" | |
"πΉ `/joke` - Get a random joke\n" | |
"πΉ `/inspire` - Receive an inspirational quote\n" | |
"πΉ `/weather <location>` - Check the weather\n\n" | |
"You can also just chat with me naturally!" | |
) | |
self.api_client.send_message(chat_id, help_text, message_id) | |
elif command == "/gen": | |
self.task_queue.put({"type": "generate_image", "chat_id": chat_id, "message_id": message_id, "prompt": args, "count": self.config.DEFAULT_IMAGE_COUNT}) | |
elif command == "/edit": | |
self._dispatch_edit_image(chat_id, message_id, args, payload) | |
elif command == "/joke": | |
self.task_queue.put({"type": "joke", "chat_id": chat_id, "message_id": message_id}) | |
elif command == "/inspire": | |
self.task_queue.put({"type": "inspire", "chat_id": chat_id, "message_id": message_id}) | |
elif command == "/weather": | |
self.task_queue.put({"type": "weather", "chat_id": chat_id, "message_id": message_id, "location": args}) | |
else: | |
self.api_client.send_message(chat_id, "Unknown command. Type /help for options.", message_id) | |
def _handle_natural_language(self, chat_id, message_id, text, payload): | |
"""Processes natural language using the intent router.""" | |
intent = self.intent_router.get_intent(text, chat_id) | |
task_data = {"chat_id": chat_id, "message_id": message_id, **intent.model_dump()} | |
if intent.action == "edit_image": | |
self._dispatch_edit_image(chat_id, message_id, intent.prompt, payload) | |
elif hasattr(self, f"_task_{intent.action}"): | |
self.task_queue.put({"type": intent.action, **task_data}) | |
else: | |
self.logger.warning(f"No handler found for intent action: {intent.action}") | |
self.api_client.send_message(chat_id, "Sorry, I'm not sure how to handle that.", message_id) | |
def _dispatch_edit_image(self, chat_id, message_id, prompt, payload): | |
"""Finds a replied-to image from cache and dispatches the edit task.""" | |
message_data = payload.get("messageData", {}) | |
if message_data.get("typeMessage") != "quotedMessage": | |
self.api_client.send_message(chat_id, "To edit an image, please reply to an image with your instructions.", message_id) | |
return | |
quoted_info = message_data.get("quotedMessage", {}) | |
quoted_message_id = quoted_info.get("stanzaId") | |
if not quoted_message_id: | |
self.logger.error(f"Could not find stanzaId in quoted message: {quoted_info}") | |
self.api_client.send_message(chat_id, "Sorry, I couldn't identify which image to edit.", message_id) | |
return | |
input_source, source_type = None, None | |
# 1. Check cache for bot-sent images (local paths) | |
if chat_id in self.sent_image_path_cache: | |
for msg_id, path in reversed(self.sent_image_path_cache[chat_id]): | |
if msg_id == quoted_message_id: | |
if os.path.exists(path): | |
input_source, source_type = path, 'path' | |
self.logger.info(f"Found replied-to image in local path cache: {path}") | |
else: | |
self.logger.warning(f"Found path for message {msg_id} but file missing: {path}") | |
break | |
# 2. If not found, check cache for user-sent images (URLs) | |
if not input_source and chat_id in self.image_cache: | |
for msg_id, url in reversed(self.image_cache[chat_id]): | |
if msg_id == quoted_message_id: | |
input_source, source_type = url, 'url' | |
self.logger.info(f"Found replied-to image in URL cache: {url}") | |
break | |
if not input_source: | |
self.logger.warning(f"Could not find any image source for message ID {quoted_message_id} in cache for chat {chat_id}.") | |
self.api_client.send_message(chat_id, "I couldn't find the original image. It might be too old. Please send it again before editing.", message_id) | |
return | |
self.logger.info(f"Dispatching edit task for message {quoted_message_id} with source type '{source_type}'.") | |
self.task_queue.put({ | |
"type": "edit_image", "chat_id": chat_id, "message_id": message_id, | |
"prompt": prompt, "input_source": input_source, "source_type": source_type, | |
}) | |
def _manage_cached_file(self, chat_id: str, message_id: str, file_path: str): | |
"""Adds a file to the cache and cleans up the oldest file if the cache is full.""" | |
# Check if the cache is full before adding a new item | |
if len(self.sent_image_path_cache[chat_id]) == self.sent_image_path_cache[chat_id].maxlen: | |
old_id, old_path = self.sent_image_path_cache[chat_id][0] # Oldest item is at the left | |
if os.path.exists(old_path): | |
self.logger.debug(f"Cache full. Removing oldest cached image file: {old_path}") | |
try: | |
os.remove(old_path) | |
except OSError as e: | |
self.logger.error(f"Error removing old cached file {old_path}: {e}") | |
self.sent_image_path_cache[chat_id].append((message_id, file_path)) | |
# --- Task Handler Methods --- | |
def _task_send_text(self, task: Dict[str, Any]): | |
chat_id, message_id, message = task["chat_id"], task["message_id"], task["message"] | |
self.api_client.send_message(chat_id, message, message_id) | |
self.conv_manager.add_bot_message(chat_id, message) | |
self.task_queue.put({"type": "voice_reply", "chat_id": chat_id, "message_id": message_id, "text": message}) | |
def _task_generate_image(self, task: Dict[str, Any]): | |
chat_id, mid, prompt = task["chat_id"], task["message_id"], task["prompt"] | |
count = task.get("count", self.config.DEFAULT_IMAGE_COUNT) | |
self.api_client.send_message(chat_id, f"π¨ Generating {count} image(s) for: \"{prompt}\"...", mid) | |
for i in range(count): | |
path = None | |
try: | |
# Create a unique ID for each image in the batch to avoid mismatches. | |
unique_image_id = f"{mid}_{i}" | |
# Call the image generation library with matching IDs | |
generation_result = generate_image( | |
prompt, | |
unique_image_id, | |
unique_image_id, | |
self.config.IMAGE_DIR, | |
width=task.get("width"), | |
height=task.get("height") | |
) | |
# If generation fails, the result will be None. Handle this gracefully. | |
if not generation_result: | |
self.logger.error(f"Image generation {i+1} failed because the generator returned None.") | |
self.api_client.send_message(chat_id, f"π’ Failed to generate image {i+1}.", mid) | |
continue # Move to the next image in the loop | |
_, path, _, _ = generation_result | |
caption = f"β¨ Image {i+1}/{count}: {prompt}" | |
response = self.api_client.send_file(chat_id, path, caption, mid) | |
if response and response.get("idMessage"): | |
sent_message_id = response["idMessage"] | |
self.logger.info(f"Caching sent image path for message {sent_message_id}: {path}") | |
self._manage_cached_file(chat_id, sent_message_id, path) | |
else: | |
# If we can't get the message ID, we can't cache the image for later editing. | |
# It's better to delete it to avoid filling up the disk. | |
self.logger.warning(f"Could not get sent message ID for {path}. Deleting file.") | |
if path and os.path.exists(path): | |
os.remove(path) | |
except Exception as e: | |
# Catch any other unexpected errors during the process | |
self.logger.error(f"Image generation {i+1} failed with an exception: {e}", exc_info=True) | |
self.api_client.send_message(chat_id, f"π’ An error occurred while generating image {i+1}.", mid) | |
if path and os.path.exists(path): | |
os.remove(path) # Clean up partially created files | |
def _task_edit_image(self, task: Dict[str, Any]): | |
chat_id, mid, prompt = task["chat_id"], task["message_id"], task["prompt"] | |
input_source, source_type = task["input_source"], task["source_type"] | |
self.api_client.send_message(chat_id, f"π¨ Editing image with prompt: \"{prompt}\"...", mid) | |
input_path_temp, output_path = None, None | |
try: | |
os.makedirs(self.config.TEMP_DIR, exist_ok=True) | |
output_path = os.path.join(self.config.TEMP_DIR, f"output_{mid}.jpg") | |
if source_type == 'url': | |
image_data = self.api_client.download_file(input_source) | |
if not image_data: raise ValueError(f"Failed to download image from URL: {input_source}") | |
input_path_temp = os.path.join(self.config.TEMP_DIR, f"input_{mid}.jpg") | |
with open(input_path_temp, 'wb') as f: f.write(image_data) | |
edit_input_path = input_path_temp | |
elif source_type == 'path': | |
edit_input_path = input_source | |
else: | |
raise ValueError(f"Unknown source_type: {source_type}") | |
gemini_flash_lib.generate_image(prompt, edit_input_path, download_path=output_path) | |
if os.path.exists(output_path): | |
caption = f"β¨ Edited: {prompt}" | |
response = self.api_client.send_file(chat_id, output_path, caption, mid) | |
if response and response.get("idMessage"): | |
sent_message_id = response["idMessage"] | |
self.logger.info(f"Caching sent image path for message {sent_message_id}: {output_path}") | |
self._manage_cached_file(chat_id, sent_message_id, output_path) | |
else: | |
self.logger.warning(f"Could not get sent message ID for {output_path}. Deleting file.") | |
os.remove(output_path) | |
else: | |
raise ValueError("Edited image file not found.") | |
except Exception as e: | |
self.logger.error(f"Image editing task failed: {e}", exc_info=True) | |
self.api_client.send_message(chat_id, "π’ Sorry, I failed to edit the image.", mid) | |
if output_path and os.path.exists(output_path): os.remove(output_path) | |
finally: | |
if input_path_temp and os.path.exists(input_path_temp): os.remove(input_path_temp) | |
def _task_voice_reply(self, task: Dict[str, Any]): | |
text = task["text"] | |
prompt = f"Say this in a friendly, playful, and slightly clumsy-cute way: {text}" | |
try: | |
result = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=self.config.AUDIO_DIR) | |
if result and result[0]: | |
path, _ = result | |
self.api_client.send_file(task["chat_id"], path, quoted_message_id=task["message_id"]) | |
os.remove(path) | |
except Exception as e: | |
self.logger.warning(f"Voice reply generation failed: {e}", exc_info=True) | |
def _task_joke(self, task: Dict[str, Any]): | |
try: | |
j = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json() | |
joke = f"{j['setup']}\n\n{j['punchline']}" | |
except Exception: | |
self.logger.warning("Joke API failed, falling back to LLM.") | |
joke = generate_llm("Tell me a short, clean joke.") | |
self._task_send_text({"type": "send_text", **task, "message": f"π {joke}"}) | |
def _task_inspire(self, task: Dict[str, Any]): | |
quote = generate_llm("Give me a unique, short, uplifting inspirational quote with attribution.") | |
self._task_send_text({"type": "send_text", **task, "message": f"β¨ {quote}"}) | |
def _task_weather(self, task: Dict[str, Any]): | |
location = task.get("location") | |
if not location: | |
self.api_client.send_message(task["chat_id"], "Please provide a location for the weather.", task["message_id"]) | |
return | |
try: | |
raw = requests.get(f"http://wttr.in/{location.replace(' ', '+')}?format=4", timeout=10).text | |
report = generate_llm(f"Create a friendly weather report in Celsius from this data:\n\n{raw}") | |
self._task_send_text({"type": "send_text", **task, "message": f"π€οΈ Weather for {location}:\n{report}"}) | |
except Exception as e: | |
self.logger.error(f"Weather task failed: {e}", exc_info=True) | |
self.api_client.send_message(task["chat_id"], "Sorry, I couldn't get the weather.", task["message_id"]) | |
def run(self): | |
"""Starts the bot and FastAPI server.""" | |
self.logger.info("Starting Eve WhatsApp Bot...") | |
for d in [self.config.IMAGE_DIR, self.config.AUDIO_DIR, self.config.TEMP_DIR]: | |
os.makedirs(d, exist_ok=True) | |
self.logger.debug(f"Ensured directory exists: {d}") | |
self.api_client.send_message( | |
"[email protected]", | |
"π Eve is online and ready to help! Type /help to see commands." | |
) | |
uvicorn.run(self.fastapi_app, host="0.0.0.0", port=7860, log_config=None) | |
if __name__ == "__main__": | |
try: | |
bot_config = BotConfig() | |
executor = ThreadPoolExecutor(max_workers=bot_config.WORKER_THREADS * 2) | |
bot = WhatsAppBot(bot_config) | |
bot.run() | |
except ValueError as e: | |
logging.basicConfig() | |
logging.getLogger().critical(f"β CONFIGURATION ERROR: {e}") | |
except KeyboardInterrupt: | |
print("\nπ Bot stopped by user.") | |
except Exception as e: | |
logging.getLogger().critical(f"β A fatal error occurred: {e}", exc_info=True) | |