#!/usr/bin/env python3
import os
import re
import tempfile
import gc
from collections.abc import Iterator
from threading import Thread
import json
import requests
import cv2
import gradio as gr
import spaces
import torch
import numpy as np
from loguru import logger
from PIL import Image
from transformers import AutoProcessor, Gemma3ForConditionalGeneration, TextIteratorStreamer
import time
import warnings
from typing import Dict, List, Optional, Union
# CSV/TXT 분석
import pandas as pd
# PDF 텍스트 추출
import PyPDF2
warnings.filterwarnings('ignore')
print("🎮 로봇 시각 시스템 초기화 (Gemma3-R1984-4B)...")
##############################################################################
# 상수 정의
##############################################################################
MAX_CONTENT_CHARS = 2000
MAX_INPUT_LENGTH = 2096
MAX_NUM_IMAGES = 5
SERPHOUSE_API_KEY = os.getenv("SERPHOUSE_API_KEY", "")
##############################################################################
# 전역 변수
##############################################################################
model = None
processor = None
model_loaded = False
model_name = "Gemma3-R1984-4B"
##############################################################################
# 메모리 관리
##############################################################################
def clear_cuda_cache():
"""CUDA 캐시를 명시적으로 비웁니다."""
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
##############################################################################
# 키워드 추출 함수
##############################################################################
def extract_keywords(text: str, top_k: int = 5) -> str:
"""키워드 추출"""
text = re.sub(r"[^a-zA-Z0-9가-힣\s]", "", text)
tokens = text.split()
seen = set()
unique_tokens = []
for token in tokens:
if token not in seen and len(token) > 1:
seen.add(token)
unique_tokens.append(token)
key_tokens = unique_tokens[:top_k]
return " ".join(key_tokens)
##############################################################################
# 웹 검색 함수
##############################################################################
def do_web_search(query: str) -> str:
"""SerpHouse API를 사용한 웹 검색"""
try:
url = "https://api.serphouse.com/serp/live"
params = {
"q": query,
"domain": "google.com",
"serp_type": "web",
"device": "desktop",
"lang": "ko", # 한국어 우선
"num": "10" # 10개로 제한
}
headers = {
"Authorization": f"Bearer {SERPHOUSE_API_KEY}"
}
logger.info(f"웹 검색 중... 검색어: {query}")
response = requests.get(url, headers=headers, params=params, timeout=60)
response.raise_for_status()
data = response.json()
results = data.get("results", {})
organic = results.get("organic", []) if isinstance(results, dict) else []
if not organic:
return "검색 결과를 찾을 수 없습니다."
max_results = min(10, len(organic))
limited_organic = organic[:max_results]
summary_lines = []
for idx, item in enumerate(limited_organic, start=1):
title = item.get("title", "제목 없음")
link = item.get("link", "#")
snippet = item.get("snippet", "설명 없음")
displayed_link = item.get("displayed_link", link)
summary_lines.append(
f"### 결과 {idx}: {title}\n\n"
f"{snippet}\n\n"
f"**출처**: [{displayed_link}]({link})\n\n"
f"---\n"
)
instructions = """# 웹 검색 결과
아래는 검색 결과입니다. 답변 시 이 정보를 활용하세요:
1. 각 결과의 제목, 내용, 출처 링크를 참조하세요
2. 관련 출처를 명시적으로 인용하세요
3. 여러 출처의 정보를 종합하여 답변하세요
"""
search_results = instructions + "\n".join(summary_lines)
return search_results
except Exception as e:
logger.error(f"웹 검색 실패: {e}")
return f"웹 검색 실패: {str(e)}"
##############################################################################
# 문서 처리 함수
##############################################################################
def analyze_csv_file(path: str) -> str:
"""CSV 파일 분석"""
try:
df = pd.read_csv(path)
if df.shape[0] > 50 or df.shape[1] > 10:
df = df.iloc[:50, :10]
df_str = df.to_string()
if len(df_str) > MAX_CONTENT_CHARS:
df_str = df_str[:MAX_CONTENT_CHARS] + "\n...(중략)..."
return f"**[CSV 파일: {os.path.basename(path)}]**\n\n{df_str}"
except Exception as e:
return f"CSV 읽기 실패 ({os.path.basename(path)}): {str(e)}"
def analyze_txt_file(path: str) -> str:
"""TXT 파일 분석"""
try:
with open(path, "r", encoding="utf-8") as f:
text = f.read()
if len(text) > MAX_CONTENT_CHARS:
text = text[:MAX_CONTENT_CHARS] + "\n...(중략)..."
return f"**[TXT 파일: {os.path.basename(path)}]**\n\n{text}"
except Exception as e:
return f"TXT 읽기 실패 ({os.path.basename(path)}): {str(e)}"
def pdf_to_markdown(pdf_path: str) -> str:
"""PDF를 마크다운으로 변환"""
text_chunks = []
try:
with open(pdf_path, "rb") as f:
reader = PyPDF2.PdfReader(f)
max_pages = min(5, len(reader.pages))
for page_num in range(max_pages):
page = reader.pages[page_num]
page_text = page.extract_text() or ""
page_text = page_text.strip()
if page_text:
if len(page_text) > MAX_CONTENT_CHARS // max_pages:
page_text = page_text[:MAX_CONTENT_CHARS // max_pages] + "...(중략)"
text_chunks.append(f"## 페이지 {page_num+1}\n\n{page_text}\n")
if len(reader.pages) > max_pages:
text_chunks.append(f"\n...({max_pages}/{len(reader.pages)} 페이지 표시)...")
except Exception as e:
return f"PDF 읽기 실패 ({os.path.basename(pdf_path)}): {str(e)}"
full_text = "\n".join(text_chunks)
if len(full_text) > MAX_CONTENT_CHARS:
full_text = full_text[:MAX_CONTENT_CHARS] + "\n...(중략)..."
return f"**[PDF 파일: {os.path.basename(pdf_path)}]**\n\n{full_text}"
##############################################################################
# 모델 로드
##############################################################################
@spaces.GPU(duration=120)
def load_model():
global model, processor, model_loaded
if model_loaded:
logger.info("모델이 이미 로드되어 있습니다.")
return True
try:
logger.info("Gemma3-R1984-4B 모델 로딩 시작...")
clear_cuda_cache()
model_id = os.getenv("MODEL_ID", "VIDraft/Gemma-3-R1984-4B")
processor = AutoProcessor.from_pretrained(model_id, padding_side="left")
model = Gemma3ForConditionalGeneration.from_pretrained(
model_id,
device_map="auto",
torch_dtype=torch.bfloat16,
attn_implementation="eager"
)
model_loaded = True
logger.info(f"✅ {model_name} 로딩 완료!")
return True
except Exception as e:
logger.error(f"모델 로딩 실패: {e}")
return False
##############################################################################
# 이미지 분석 (로봇 태스크 중심)
##############################################################################
@spaces.GPU(duration=60)
def analyze_image_for_robot(
image: Union[np.ndarray, Image.Image],
prompt: str,
task_type: str = "general",
use_web_search: bool = False,
enable_thinking: bool = False, # 기본값 False로 변경
max_new_tokens: int = 300 # 장면 설명을 위해 300으로 증가
) -> str:
"""로봇 작업을 위한 이미지 분석"""
global model, processor
if not model_loaded:
if not load_model():
return "❌ 모델 로딩 실패"
try:
# numpy 배열을 PIL 이미지로 변환
if isinstance(image, np.ndarray):
image = Image.fromarray(image).convert('RGB')
# 태스크별 시스템 프롬프트 구성 (더 간결하게)
system_prompts = {
"general": "당신은 로봇 시각 시스템입니다. 먼저 장면을 1-2줄로 설명하고, 핵심 내용을 간결하게 분석하세요.",
"planning": """당신은 로봇 작업 계획 AI입니다.
먼저 장면 이해를 1-2줄로 설명하고, 그 다음 작업 계획을 작성하세요.
형식:
[장면 이해] 현재 보이는 장면을 1-2줄로 설명
[작업 계획]
Step_1: xxx
Step_2: xxx
Step_n: xxx""",
"grounding": "당신은 객체 위치 시스템입니다. 먼저 보이는 객체들을 한 줄로 설명하고, 요청된 객체 위치를 [x1, y1, x2, y2]로 반환하세요.",
"affordance": "당신은 파지점 분석 AI입니다. 먼저 대상 객체를 한 줄로 설명하고, 파지 영역을 [x1, y1, x2, y2]로 반환하세요.",
"trajectory": "당신은 경로 계획 AI입니다. 먼저 환경을 한 줄로 설명하고, 경로를 [(x1,y1), (x2,y2), ...]로 제시하세요.",
"pointing": "당신은 지점 지정 시스템입니다. 먼저 참조점들을 한 줄로 설명하고, 위치를 [(x1,y1), (x2,y2), ...]로 반환하세요."
}
system_prompt = system_prompts.get(task_type, system_prompts["general"])
# Chain-of-Thought 추가 (선택적)
if enable_thinking:
system_prompt += "\n\n추론 과정을 태그 안에 작성 후 최종 답변을 제시하세요. 장면 이해는 추론 과정과 별도로 반드시 포함하세요."
# 웹 검색 수행
combined_system = system_prompt
if use_web_search:
keywords = extract_keywords(prompt, top_k=5)
if keywords:
logger.info(f"웹 검색 키워드: {keywords}")
search_results = do_web_search(keywords)
combined_system = f"{search_results}\n\n{system_prompt}"
# 메시지 구성
messages = [
{
"role": "system",
"content": [{"type": "text", "text": combined_system}]
},
{
"role": "user",
"content": [
{"type": "image", "url": image},
{"type": "text", "text": prompt}
]
}
]
# 입력 처리
inputs = processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt",
).to(device=model.device, dtype=torch.bfloat16)
# 입력 토큰 수 제한
if inputs.input_ids.shape[1] > MAX_INPUT_LENGTH:
inputs.input_ids = inputs.input_ids[:, -MAX_INPUT_LENGTH:]
if 'attention_mask' in inputs:
inputs.attention_mask = inputs.attention_mask[:, -MAX_INPUT_LENGTH:]
# 생성
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=True,
temperature=0.7,
top_p=0.9,
pad_token_id=processor.tokenizer.pad_token_id,
eos_token_id=processor.tokenizer.eos_token_id,
)
# 입력 토큰 제거하여 출력만 추출
generated_tokens = outputs[0][inputs.input_ids.shape[1]:]
# 디코딩
response = processor.decode(generated_tokens, skip_special_tokens=True).strip()
# 프롬프트 제거 및 정리
# 이미 입력 토큰을 제거했으므로 추가 정리만 수행
response = response.strip()
# 혹시 남아있는 불필요한 텍스트 제거
if response.startswith("model\n"):
response = response[6:].strip()
elif response.startswith("model"):
response = response[5:].strip()
return response
except Exception as e:
logger.error(f"이미지 분석 오류: {e}")
import traceback
return f"❌ 분석 오류: {str(e)}\n{traceback.format_exc()}"
finally:
clear_cuda_cache()
##############################################################################
# 문서 분석 (스트리밍)
##############################################################################
def _model_gen_with_oom_catch(**kwargs):
"""OOM 처리를 위한 생성 함수"""
global model
try:
model.generate(**kwargs)
except torch.cuda.OutOfMemoryError:
raise RuntimeError("GPU 메모리 부족. Max Tokens를 줄여주세요.")
finally:
clear_cuda_cache()
@spaces.GPU(duration=120)
def analyze_documents_streaming(
files: List[str],
prompt: str,
use_web_search: bool = False,
max_new_tokens: int = 2048
) -> Iterator[str]:
"""문서 분석 (스트리밍)"""
global model, processor
if not model_loaded:
if not load_model():
yield "❌ 모델 로딩 실패"
return
try:
# 시스템 프롬프트
system_content = "당신은 문서를 분석하고 요약하는 전문 AI입니다."
# 웹 검색
if use_web_search:
keywords = extract_keywords(prompt, top_k=5)
if keywords:
search_results = do_web_search(keywords)
system_content = f"{search_results}\n\n{system_content}"
# 문서 내용 처리
doc_contents = []
for file_path in files:
if file_path.lower().endswith('.csv'):
content = analyze_csv_file(file_path)
elif file_path.lower().endswith('.txt'):
content = analyze_txt_file(file_path)
elif file_path.lower().endswith('.pdf'):
content = pdf_to_markdown(file_path)
else:
continue
doc_contents.append(content)
# 메시지 구성
messages = [
{
"role": "system",
"content": [{"type": "text", "text": system_content}]
},
{
"role": "user",
"content": [
{"type": "text", "text": "\n\n".join(doc_contents) + f"\n\n{prompt}"}
]
}
]
# 입력 처리
inputs = processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt",
).to(device=model.device, dtype=torch.bfloat16)
# 스트리밍 설정
streamer = TextIteratorStreamer(processor, timeout=30.0, skip_prompt=True, skip_special_tokens=True)
gen_kwargs = dict(
inputs,
streamer=streamer,
max_new_tokens=max_new_tokens,
temperature=0.8,
top_p=0.9,
)
# 별도 스레드에서 생성
t = Thread(target=_model_gen_with_oom_catch, kwargs=gen_kwargs)
t.start()
# 스트리밍 출력
output = ""
for new_text in streamer:
output += new_text
yield output
except Exception as e:
logger.error(f"문서 분석 오류: {e}")
yield f"❌ 오류 발생: {str(e)}"
finally:
clear_cuda_cache()
##############################################################################
# Gradio UI (로봇 시각화 중심)
##############################################################################
css = """
.robot-header {
text-align: center;
background: linear-gradient(135deg, #1e3c72 0%, #2a5298 50%, #667eea 100%);
color: white;
padding: 20px;
border-radius: 10px;
margin-bottom: 20px;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
}
.status-box {
text-align: center;
padding: 10px;
border-radius: 5px;
margin: 10px 0;
font-weight: bold;
}
.info-box {
background: #f0f0f0;
padding: 15px;
border-radius: 8px;
margin: 10px 0;
border-left: 4px solid #2a5298;
}
.task-button {
min-height: 60px;
font-size: 1.1em;
}
.webcam-container {
border: 3px solid #2a5298;
border-radius: 10px;
padding: 10px;
background: #f8f9fa;
}
.auto-capture-status {
text-align: center;
padding: 5px;
border-radius: 5px;
margin: 5px 0;
font-weight: bold;
background: #e8f5e9;
color: #2e7d32;
}
"""
with gr.Blocks(title="🤖 로봇 시각 시스템 (Gemma3-4B)", css=css) as demo:
gr.HTML("""
""")
with gr.Row():
# 왼쪽: 웹캠 및 입력
with gr.Column(scale=1):
gr.Markdown("### 📷 실시간 웹캠")
with gr.Group(elem_classes="webcam-container"):
webcam = gr.Image(
sources=["webcam"],
streaming=True,
type="numpy",
label="실시간 스트리밍",
height=350
)
# 자동 캡처 상태 표시
auto_capture_status = gr.HTML(
'🔄 자동 캡처: 대기 중
'
)
# 캡처된 이미지 표시
captured_image = gr.Image(
label="캡처된 이미지",
height=200,
visible=False
)
# 로봇 작업 버튼들
gr.Markdown("### 🎯 로봇 작업 선택")
with gr.Row():
capture_btn = gr.Button("📸 수동 캡처", variant="primary", elem_classes="task-button")
clear_capture_btn = gr.Button("🗑️ 초기화", elem_classes="task-button")
with gr.Row():
auto_capture_toggle = gr.Checkbox(
label="🔄 자동 캡처 활성화 (10초마다)",
value=False,
info="활성화 시 10초마다 자동으로 캡처 및 분석"
)
with gr.Row():
planning_btn = gr.Button("📋 작업 계획", elem_classes="task-button")
grounding_btn = gr.Button("📍 객체 위치", elem_classes="task-button")
with gr.Row():
affordance_btn = gr.Button("🤏 파지점 분석", elem_classes="task-button")
trajectory_btn = gr.Button("🛤️ 경로 계획", elem_classes="task-button")
# 오른쪽: 분석 설정 및 결과
with gr.Column(scale=2):
gr.Markdown("### ⚙️ 분석 설정")
with gr.Row():
with gr.Column():
task_prompt = gr.Textbox(
label="작업 설명 / 질문",
placeholder="예: 테이블 위의 컵을 잡아서 싱크대에 놓기",
value="현재 장면을 분석하고 로봇이 수행할 수 있는 작업을 제안하세요.",
lines=2
)
with gr.Row():
use_web_search = gr.Checkbox(
label="🔍 웹 검색 사용",
value=False,
info="관련 정보를 웹에서 검색합니다"
)
enable_thinking = gr.Checkbox(
label="🤔 추론 과정 표시",
value=False, # 기본값 False로 변경
info="Chain-of-Thought 추론 과정을 보여줍니다"
)
max_tokens = gr.Slider(
label="최대 토큰 수",
minimum=100,
maximum=4096,
value=300, # 장면 설명을 위해 300으로 증가
step=50
)
gr.Markdown("### 📊 분석 결과")
result_output = gr.Textbox(
label="AI 분석 결과",
lines=20,
max_lines=40,
show_copy_button=True,
elem_id="result"
)
status_display = gr.HTML(
'🎮 시스템 준비 완료
'
)
# 문서 분석 탭 (숨김 처리)
with gr.Tab("📄 문서 분석", visible=False): # visible=False로 숨김
with gr.Row():
with gr.Column():
doc_files = gr.File(
label="문서 업로드",
file_count="multiple",
file_types=[".pdf", ".csv", ".txt"],
type="filepath"
)
doc_prompt = gr.Textbox(
label="분석 요청",
placeholder="예: 이 문서들의 핵심 내용을 요약하고 비교 분석하세요.",
lines=3
)
doc_web_search = gr.Checkbox(
label="🔍 웹 검색 사용",
value=False
)
analyze_docs_btn = gr.Button("📊 문서 분석", variant="primary")
with gr.Column():
doc_result = gr.Textbox(
label="분석 결과",
lines=25,
max_lines=50
)
# 이벤트 핸들러
webcam_state = gr.State(None)
auto_capture_state = gr.State({"enabled": False, "timer": None})
def capture_webcam(frame):
"""웹캠 프레임 캡처"""
if frame is None:
return None, None, '❌ 웹캠 프레임 없음
'
return frame, gr.update(value=frame, visible=True), '✅ 이미지 캡처 완료
'
def clear_capture():
"""캡처 초기화"""
return None, gr.update(visible=False), '🎮 시스템 준비 완료
'
def analyze_with_task(image, prompt, task_type, use_search, thinking, tokens):
"""특정 태스크로 이미지 분석"""
if image is None:
return "❌ 먼저 이미지를 캡처하세요.", '❌ 이미지 없음
'
status = f'🚀 {task_type} 분석 중...
'
result = analyze_image_for_robot(
image=image,
prompt=prompt,
task_type=task_type,
use_web_search=use_search,
enable_thinking=thinking,
max_new_tokens=tokens
)
# 결과 포맷팅 (더 간결하게)
timestamp = time.strftime("%H:%M:%S")
task_names = {
"planning": "작업 계획",
"grounding": "객체 위치",
"affordance": "파지점",
"trajectory": "경로 계획"
}
formatted_result = f"""🤖 {task_names.get(task_type, '분석')} 결과 ({timestamp})
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
{result}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"""
complete_status = '✅ 분석 완료!
'
return formatted_result, complete_status
# 자동 캡처 및 분석 함수
def auto_capture_and_analyze(webcam_frame, task_prompt, use_search, thinking, tokens, auto_state):
"""자동 캡처 및 분석"""
if webcam_frame is None:
return (
None,
"자동 캡처 대기 중...",
'⏳ 웹캠 대기 중
',
'🔄 자동 캡처: 웹캠 대기 중
'
)
# 캡처 수행
timestamp = time.strftime("%H:%M:%S")
# 이미지 분석 (작업 계획 모드로)
result = analyze_image_for_robot(
image=webcam_frame,
prompt=task_prompt,
task_type="planning",
use_web_search=use_search,
enable_thinking=thinking,
max_new_tokens=tokens
)
formatted_result = f"""🔄 자동 분석 완료 ({timestamp})
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
{result}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"""
return (
webcam_frame,
formatted_result,
'✅ 자동 분석 완료
',
f'🔄 자동 캡처: 마지막 분석 {timestamp}
'
)
# 웹캠 스트리밍
webcam.stream(
fn=lambda x: x,
inputs=[webcam],
outputs=[webcam_state]
)
# 수동 캡처 버튼
capture_btn.click(
fn=capture_webcam,
inputs=[webcam_state],
outputs=[webcam_state, captured_image, status_display]
)
# 초기화 버튼
clear_capture_btn.click(
fn=clear_capture,
outputs=[webcam_state, captured_image, status_display]
)
# 작업 버튼들
planning_btn.click(
fn=lambda img, p, s, t, tk: analyze_with_task(img, p, "planning", s, t, tk),
inputs=[captured_image, task_prompt, use_web_search, enable_thinking, max_tokens],
outputs=[result_output, status_display]
)
grounding_btn.click(
fn=lambda img, p, s, t, tk: analyze_with_task(img, p, "grounding", s, t, tk),
inputs=[captured_image, task_prompt, use_web_search, enable_thinking, max_tokens],
outputs=[result_output, status_display]
)
affordance_btn.click(
fn=lambda img, p, s, t, tk: analyze_with_task(img, p, "affordance", s, t, tk),
inputs=[captured_image, task_prompt, use_web_search, enable_thinking, max_tokens],
outputs=[result_output, status_display]
)
trajectory_btn.click(
fn=lambda img, p, s, t, tk: analyze_with_task(img, p, "trajectory", s, t, tk),
inputs=[captured_image, task_prompt, use_web_search, enable_thinking, max_tokens],
outputs=[result_output, status_display]
)
# 문서 분석
def analyze_docs(files, prompt, use_search):
if not files:
return "❌ 문서를 업로드하세요."
output = ""
for chunk in analyze_documents_streaming(files, prompt, use_search):
output = chunk
return output
analyze_docs_btn.click(
fn=analyze_docs,
inputs=[doc_files, doc_prompt, doc_web_search],
outputs=[doc_result]
)
# 자동 캡처 타이머 (10초마다)
timer = gr.Timer(10.0, active=False) # 10초 타이머, 초기에는 비활성화
# 자동 캡처 토글 이벤트
def toggle_auto_capture(enabled):
if enabled:
return gr.Timer(10.0, active=True), '🔄 자동 캡처: 활성화됨 (10초마다)
'
else:
return gr.Timer(active=False), '🔄 자동 캡처: 비활성화됨
'
auto_capture_toggle.change(
fn=toggle_auto_capture,
inputs=[auto_capture_toggle],
outputs=[timer, auto_capture_status]
)
# 타이머 틱 이벤트
timer.tick(
fn=auto_capture_and_analyze,
inputs=[webcam_state, task_prompt, use_web_search, enable_thinking, max_tokens, auto_capture_state],
outputs=[captured_image, result_output, status_display, auto_capture_status]
)
# 초기 모델 로드
def initial_load():
load_model()
return "시스템 준비 완료! 🚀"
demo.load(
fn=initial_load,
outputs=None
)
if __name__ == "__main__":
print("🚀 로봇 시각 시스템 시작 (Gemma3-R1984-4B)...")
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
debug=False
)