ssfinder-matching / db_connector.py
asefasdfcv's picture
Update db_connector.py
1a4da72 verified
raw
history blame
4.89 kB
"""
MySQL 데이터베이스 연결 및 습득물 데이터 조회 모듈
"""
import os
import logging
import traceback
import pymysql
from pymysql.cursors import DictCursor
from contextlib import contextmanager
# 로깅 설정
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 데이터베이스 연결 설정
DB_CONFIG = {
'host': os.getenv('DB_HOST', 'localhost'),
'port': int(os.getenv('DB_PORT', 3306)),
'user': os.getenv('DB_USER', 'username'),
'password': os.getenv('DB_PASSWORD', 'password'),
'db': os.getenv('DB_NAME', 'foundlost'),
'charset': 'utf8mb4',
'cursorclass': DictCursor
}
@contextmanager
def get_db_connection():
"""
데이터베이스 연결을 제공하는 컨텍스트 매니저
Yields:
pymysql.connections.Connection: 데이터베이스 연결 객체
"""
connection = None
try:
connection = pymysql.connect(**DB_CONFIG)
logger.info("데이터베이스 연결 성공")
yield connection
except Exception as e:
logger.error(f"데이터베이스 연결 오류: {str(e)}")
logger.error(traceback.format_exc())
raise
finally:
if connection:
connection.close()
logger.debug("데이터베이스 연결 종료")
async def fetch_found_items(limit=100, offset=0):
"""
MySQL 데이터베이스에서 습득물 데이터를 조회
Args:
limit (int): 조회할 최대 항목 수 (기본값: 100)
offset (int): 조회 시작 위치 (기본값: 0)
Returns:
list: 습득물 데이터 목록
"""
found_items = []
try:
with get_db_connection() as connection:
with connection.cursor() as cursor:
# 습득물 데이터 조회 쿼리 (ERD에 맞게 수정)
# name 컬럼을 title로 매핑, detail 컬럼을 content로 매핑
query = """
SELECT f.id, f.user_id, f.item_category_id, f.name as title, f.color,
f.detail as content, f.location, f.image, f.status, f.found_at as lost_at,
f.created_at, f.management_id, f.stored_at,
ic.name as category_name
FROM found_item f
LEFT JOIN item_category ic ON f.item_category_id = ic.id
WHERE f.status = 'STORED'
ORDER BY f.created_at DESC
LIMIT %s OFFSET %s
"""
cursor.execute(query, (limit, offset))
rows = cursor.fetchall()
# 조회 결과를 API 응답 형식에 맞게 변환
for row in rows:
found_item = {
"id": row["id"],
"user_id": row["user_id"],
"item_category_id": row["item_category_id"],
"title": row["title"], # name 컬럼을 title로 매핑
"color": row["color"],
"content": row["content"], # detail 컬럼을 content로 매핑
"location": row["location"],
"image": row["image"],
"status": row["status"],
"lost_at": row["lost_at"],
"stored_at": row["stored_at"],
"management_id": row["management_id"],
"category": row["category_name"] # 카테고리명 추가
}
found_items.append(found_item)
logger.info(f"{len(found_items)}개의 습득물 데이터 조회 완료")
except Exception as e:
logger.error(f"습득물 데이터 조회 중 오류 발생: {str(e)}")
logger.error(traceback.format_exc())
return found_items
# 습득물 데이터 개수 조회 함수
async def count_found_items():
"""
MySQL 데이터베이스에서 습득물 데이터의 총 개수를 조회
Returns:
int: 습득물 데이터 총 개수
"""
try:
with get_db_connection() as connection:
with connection.cursor() as cursor:
# status가 'STORED'인 항목만 조회
query = "SELECT COUNT(*) as total FROM found_item WHERE status = 'STORED'"
cursor.execute(query)
result = cursor.fetchone()
total_count = result["total"]
logger.info(f"총 습득물 데이터 개수: {total_count}")
return total_count
except Exception as e:
logger.error(f"습득물 데이터 개수 조회 중 오류 발생: {str(e)}")
logger.error(traceback.format_exc())
return 0