#!/usr/bin/env python3
import os
import re
import tempfile
import gc
from collections.abc import Iterator
from threading import Thread
import json
import requests
import cv2
import gradio as gr
import spaces
import torch
import numpy as np
from loguru import logger
from PIL import Image
import time
import warnings
from typing import Dict, List, Optional, Union
import base64
from io import BytesIO
# llama-cpp-python for GGUF
from llama_cpp import Llama
from llama_cpp.llama_chat_format import Llava16ChatHandler
# Model download
from huggingface_hub import hf_hub_download
# CSV/TXT 분석
import pandas as pd
# PDF 텍스트 추출
import PyPDF2
warnings.filterwarnings('ignore')
print("🎮 로봇 시각 시스템 초기화 (Gemma3-4B GGUF Q4_K_M)...")
##############################################################################
# 상수 정의
##############################################################################
MAX_CONTENT_CHARS = 2000
MAX_INPUT_LENGTH = 2096
MAX_NUM_IMAGES = 5
SERPHOUSE_API_KEY = os.getenv("SERPHOUSE_API_KEY", "")
##############################################################################
# 전역 변수
##############################################################################
llm = None
model_loaded = False
model_name = "Gemma3-4B-GGUF-Q4_K_M"
##############################################################################
# 메모리 관리
##############################################################################
def clear_cuda_cache():
"""CUDA 캐시를 명시적으로 비웁니다."""
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
##############################################################################
# 키워드 추출 함수
##############################################################################
def extract_keywords(text: str, top_k: int = 5) -> str:
"""키워드 추출"""
text = re.sub(r"[^a-zA-Z0-9가-힣\s]", "", text)
tokens = text.split()
seen = set()
unique_tokens = []
for token in tokens:
if token not in seen and len(token) > 1:
seen.add(token)
unique_tokens.append(token)
key_tokens = unique_tokens[:top_k]
return " ".join(key_tokens)
##############################################################################
# 웹 검색 함수
##############################################################################
def do_web_search(query: str) -> str:
"""SerpHouse API를 사용한 웹 검색"""
try:
url = "https://api.serphouse.com/serp/live"
params = {
"q": query,
"domain": "google.com",
"serp_type": "web",
"device": "desktop",
"lang": "ko",
"num": "10"
}
headers = {
"Authorization": f"Bearer {SERPHOUSE_API_KEY}"
}
logger.info(f"웹 검색 중... 검색어: {query}")
response = requests.get(url, headers=headers, params=params, timeout=60)
response.raise_for_status()
data = response.json()
results = data.get("results", {})
organic = results.get("organic", []) if isinstance(results, dict) else []
if not organic:
return "검색 결과를 찾을 수 없습니다."
max_results = min(10, len(organic))
limited_organic = organic[:max_results]
summary_lines = []
for idx, item in enumerate(limited_organic, start=1):
title = item.get("title", "제목 없음")
link = item.get("link", "#")
snippet = item.get("snippet", "설명 없음")
displayed_link = item.get("displayed_link", link)
summary_lines.append(
f"### 결과 {idx}: {title}\n\n"
f"{snippet}\n\n"
f"**출처**: [{displayed_link}]({link})\n\n"
f"---\n"
)
instructions = """# 웹 검색 결과
아래는 검색 결과입니다. 답변 시 이 정보를 활용하세요:
1. 각 결과의 제목, 내용, 출처 링크를 참조하세요
2. 관련 출처를 명시적으로 인용하세요
3. 여러 출처의 정보를 종합하여 답변하세요
"""
search_results = instructions + "\n".join(summary_lines)
return search_results
except Exception as e:
logger.error(f"웹 검색 실패: {e}")
return f"웹 검색 실패: {str(e)}"
##############################################################################
# 문서 처리 함수
##############################################################################
def analyze_csv_file(path: str) -> str:
"""CSV 파일 분석"""
try:
df = pd.read_csv(path)
if df.shape[0] > 50 or df.shape[1] > 10:
df = df.iloc[:50, :10]
df_str = df.to_string()
if len(df_str) > MAX_CONTENT_CHARS:
df_str = df_str[:MAX_CONTENT_CHARS] + "\n...(중략)..."
return f"**[CSV 파일: {os.path.basename(path)}]**\n\n{df_str}"
except Exception as e:
return f"CSV 읽기 실패 ({os.path.basename(path)}): {str(e)}"
def analyze_txt_file(path: str) -> str:
"""TXT 파일 분석"""
try:
with open(path, "r", encoding="utf-8") as f:
text = f.read()
if len(text) > MAX_CONTENT_CHARS:
text = text[:MAX_CONTENT_CHARS] + "\n...(중략)..."
return f"**[TXT 파일: {os.path.basename(path)}]**\n\n{text}"
except Exception as e:
return f"TXT 읽기 실패 ({os.path.basename(path)}): {str(e)}"
def pdf_to_markdown(pdf_path: str) -> str:
"""PDF를 마크다운으로 변환"""
text_chunks = []
try:
with open(pdf_path, "rb") as f:
reader = PyPDF2.PdfReader(f)
max_pages = min(5, len(reader.pages))
for page_num in range(max_pages):
page = reader.pages[page_num]
page_text = page.extract_text() or ""
page_text = page_text.strip()
if page_text:
if len(page_text) > MAX_CONTENT_CHARS // max_pages:
page_text = page_text[:MAX_CONTENT_CHARS // max_pages] + "...(중략)"
text_chunks.append(f"## 페이지 {page_num+1}\n\n{page_text}\n")
if len(reader.pages) > max_pages:
text_chunks.append(f"\n...({max_pages}/{len(reader.pages)} 페이지 표시)...")
except Exception as e:
return f"PDF 읽기 실패 ({os.path.basename(pdf_path)}): {str(e)}"
full_text = "\n".join(text_chunks)
if len(full_text) > MAX_CONTENT_CHARS:
full_text = full_text[:MAX_CONTENT_CHARS] + "\n...(중략)..."
return f"**[PDF 파일: {os.path.basename(pdf_path)}]**\n\n{full_text}"
##############################################################################
# 이미지를 base64로 변환
##############################################################################
def image_to_base64_data_uri(image: Union[np.ndarray, Image.Image]) -> str:
"""이미지를 base64 data URI로 변환"""
if isinstance(image, np.ndarray):
image = Image.fromarray(image).convert('RGB')
buffered = BytesIO()
image.save(buffered, format="JPEG", quality=85)
img_str = base64.b64encode(buffered.getvalue()).decode()
return f"data:image/jpeg;base64,{img_str}"
##############################################################################
# 모델 로드
##############################################################################
def download_model_files():
"""Hugging Face Hub에서 모델 파일 다운로드"""
# 여러 가능한 저장소 시도
model_repos = [
# 첫 번째 시도: 일반적인 Gemma 3 4B GGUF
{
"repo": "Mungert/gemma-3-4b-it-gguf",
"model": "google_gemma-3-4b-it-q4_k_m.gguf",
"mmproj": "google_gemma-3-4b-it-mmproj-bf16.gguf"
},
# 두 번째 시도: LM Studio 버전
{
"repo": "lmstudio-community/gemma-3-4b-it-GGUF",
"model": "gemma-3-4b-it-Q4_K_M.gguf",
"mmproj": "gemma-3-4b-it-mmproj-f16.gguf"
},
# 세 번째 시도: unsloth 버전
{
"repo": "unsloth/gemma-3-4b-it-GGUF",
"model": "gemma-3-4b-it.Q4_K_M.gguf",
"mmproj": "gemma-3-4b-it.mmproj.gguf"
}
]
for repo_info in model_repos:
try:
logger.info(f"저장소 시도: {repo_info['repo']}")
# 메인 모델 다운로드
model_filename = repo_info["model"]
logger.info(f"모델 다운로드 중: {model_filename}")
model_path = hf_hub_download(
repo_id=repo_info["repo"],
filename=model_filename,
resume_download=True,
local_files_only=False
)
# Vision projection 파일 다운로드
mmproj_filename = repo_info["mmproj"]
logger.info(f"Vision 모델 다운로드 중: {mmproj_filename}")
try:
mmproj_path = hf_hub_download(
repo_id=repo_info["repo"],
filename=mmproj_filename,
resume_download=True,
local_files_only=False
)
except:
# mmproj 파일이 없을 수도 있음
logger.warning(f"Vision 모델을 찾을 수 없습니다: {mmproj_filename}")
logger.warning("텍스트 전용 모드로 진행합니다.")
mmproj_path = None
logger.info(f"✅ 모델 다운로드 성공!")
logger.info(f"모델 경로: {model_path}")
if mmproj_path:
logger.info(f"Vision 경로: {mmproj_path}")
return model_path, mmproj_path
except Exception as e:
logger.error(f"저장소 {repo_info['repo']} 시도 실패: {e}")
continue
# 모든 시도가 실패한 경우
raise Exception("사용 가능한 GGUF 모델을 찾을 수 없습니다. 인터넷 연결을 확인하세요.")
@spaces.GPU(duration=120)
def load_model():
global llm, model_loaded
if model_loaded:
logger.info("모델이 이미 로드되어 있습니다.")
return True
try:
logger.info("Gemma3-4B GGUF Q4_K_M 모델 로딩 시작...")
clear_cuda_cache()
# 모델 파일 다운로드
model_path, mmproj_path = download_model_files()
# GPU 사용 가능 여부 확인
n_gpu_layers = -1 if torch.cuda.is_available() else 0
# 채팅 핸들러 생성 (비전 지원 - mmproj가 있는 경우만)
chat_handler = None
if mmproj_path:
try:
chat_handler = Llava16ChatHandler(
clip_model_path=mmproj_path,
verbose=False
)
logger.info("✅ Vision 모델 로드 성공")
except Exception as e:
logger.warning(f"Vision 모델 로드 실패, 텍스트 전용 모드로 전환: {e}")
chat_handler = None
# 모델 로드
llm_params = {
"model_path": model_path,
"n_ctx": 4096, # 컨텍스트 크기
"n_gpu_layers": n_gpu_layers, # GPU 레이어
"n_threads": 8, # CPU 스레드
"verbose": False,
"seed": 42,
}
# chat_handler가 있으면 추가
if chat_handler:
llm_params["chat_handler"] = chat_handler
llm_params["logits_all"] = True # 비전 모델에 필요
llm = Llama(**llm_params)
model_loaded = True
logger.info(f"✅ Gemma3-4B 모델 로딩 완료!")
if not chat_handler:
logger.warning("⚠️ 텍스트 전용 모드로 실행 중입니다. 이미지 분석이 제한될 수 있습니다.")
return True
except Exception as e:
logger.error(f"모델 로딩 실패: {e}")
import traceback
logger.error(traceback.format_exc())
return False
##############################################################################
# 채팅 템플릿 포맷팅
##############################################################################
def format_chat_prompt(system_prompt: str, user_prompt: str, image_uri: Optional[str] = None) -> List[Dict]:
"""Gemma 스타일 채팅 프롬프트 생성"""
messages = []
# 시스템 메시지
messages.append({
"role": "system",
"content": system_prompt
})
# 사용자 메시지
user_content = []
if image_uri:
user_content.append({
"type": "image_url",
"image_url": {"url": image_uri}
})
user_content.append({
"type": "text",
"text": user_prompt
})
messages.append({
"role": "user",
"content": user_content
})
return messages
##############################################################################
# 이미지 분석 (로봇 태스크 중심)
##############################################################################
@spaces.GPU(duration=60)
def analyze_image_for_robot(
image: Union[np.ndarray, Image.Image],
prompt: str,
task_type: str = "general",
use_web_search: bool = False,
enable_thinking: bool = False,
max_new_tokens: int = 300
) -> str:
"""로봇 작업을 위한 이미지 분석"""
global llm
if not model_loaded:
if not load_model():
return "❌ 모델 로딩 실패"
try:
# Vision 모델이 없는 경우 경고
if not hasattr(llm, 'chat_handler') or llm.chat_handler is None:
logger.warning("Vision 모델이 로드되지 않았습니다. 텍스트 기반 분석만 가능합니다.")
# 텍스트 전용 분석
system_prompt = f"""당신은 로봇 시각 시스템 시뮬레이터입니다.
실제 이미지를 볼 수는 없지만, 사용자의 설명을 바탕으로 로봇 작업을 계획하고 분석합니다.
태스크 유형: {task_type}"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"[이미지 분석 요청] {prompt}"}
]
response = llm.create_chat_completion(
messages=messages,
max_tokens=max_new_tokens,
temperature=0.7,
top_p=0.9,
stream=False
)
result = response['choices'][0]['message']['content'].strip()
return f"⚠️ 텍스트 전용 모드\n\n{result}"
# 이미지를 base64로 변환
image_uri = image_to_base64_data_uri(image)
# 태스크별 시스템 프롬프트 구성
system_prompts = {
"general": "당신은 로봇 시각 시스템입니다. 먼저 장면을 1-2줄로 설명하고, 핵심 내용을 간결하게 분석하세요.",
"planning": """당신은 로봇 작업 계획 AI입니다.
먼저 장면 이해를 1-2줄로 설명하고, 그 다음 작업 계획을 작성하세요.
형식:
[장면 이해] 현재 보이는 장면을 1-2줄로 설명
[작업 계획]
Step_1: xxx
Step_2: xxx
Step_n: xxx""",
"grounding": "당신은 객체 위치 시스템입니다. 먼저 보이는 객체들을 한 줄로 설명하고, 요청된 객체 위치를 [x1, y1, x2, y2]로 반환하세요.",
"affordance": "당신은 파지점 분석 AI입니다. 먼저 대상 객체를 한 줄로 설명하고, 파지 영역을 [x1, y1, x2, y2]로 반환하세요.",
"trajectory": "당신은 경로 계획 AI입니다. 먼저 환경을 한 줄로 설명하고, 경로를 [(x1,y1), (x2,y2), ...]로 제시하세요.",
"pointing": "당신은 지점 지정 시스템입니다. 먼저 참조점들을 한 줄로 설명하고, 위치를 [(x1,y1), (x2,y2), ...]로 반환하세요."
}
system_prompt = system_prompts.get(task_type, system_prompts["general"])
# Chain-of-Thought 추가 (선택적)
if enable_thinking:
system_prompt += "\n\n추론 과정을
⚡ 양자화 모델로 더 빠르고 효율적인 로봇 작업 분석!