Spaces:
Sleeping
Sleeping
| import tempfile | |
| import random | |
| import requests | |
| import json | |
| import logging | |
| from PIL import Image | |
| from huggingface_hub import HfApi | |
| from cozepy import Coze, TokenAuth | |
| import hashlib | |
| import os | |
| from app.config import DATASET_ID, COZE_API_TOKEN, HUGGING_FACE_TOKEN | |
| # 配置日志 | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # 初始化API客户端 | |
| api = HfApi() | |
| # coze = Coze(auth=TokenAuth(token=COZE_API_TOKEN), base_url="https://api.coze.cn") | |
| def calculate_image_hash(Image: Image.Image) -> str: | |
| """ | |
| 参数: | |
| Image | |
| 返回: | |
| str: 图片的MD5哈希字符串 | |
| """ | |
| return hashlib.md5(Image.tobytes()).hexdigest() | |
| def get_image_description(image_url: str) -> str: | |
| """获取图片描述""" | |
| pass | |
| # logger.info(f"Get image description") | |
| # workflow = coze.workflows.runs.create( | |
| # workflow_id='7479742935953752091', | |
| # parameters={ | |
| # "image_url": image_url | |
| # } | |
| # ) | |
| # logger.info(f"Image description: {workflow.data}") | |
| # if (workflow.data): | |
| # description = json.loads(workflow.data)['output'] | |
| # return description | |
| # else: | |
| # return "" | |
| def save_image_temp(image: Image.Image) -> str: | |
| """保存图片到临时文件""" | |
| with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as temp_img: | |
| image.save(temp_img.name, "PNG") | |
| return temp_img.name | |
| def upload_to_huggingface(temp_file_path: str) -> tuple: | |
| """上传图片到 HuggingFace""" | |
| image_filename = f"image_{random.randint(1000, 9999)}.png" | |
| file_path = f"images/{image_filename}" | |
| logger.info(f"Uploading image to HuggingFace: {file_path}") | |
| api.upload_file( | |
| path_or_fileobj=temp_file_path, | |
| path_in_repo=file_path, | |
| repo_id=DATASET_ID, | |
| token=HUGGING_FACE_TOKEN, | |
| repo_type="dataset" | |
| ) | |
| logger.info(f"Image uploaded successfully: {file_path}") | |
| return file_path, image_filename | |
| def upload_folder_to_huggingface(folder_path: str) -> None: | |
| """上传目录到 HuggingFace""" | |
| logger.info(f"Uploading folder to HuggingFace: {folder_path}") | |
| api.upload_folder( | |
| folder_path=folder_path, | |
| path_in_repo="images/", | |
| repo_id=DATASET_ID, | |
| token=HUGGING_FACE_TOKEN, | |
| repo_type="dataset" | |
| ) | |
| logger.info(f"Image uploaded successfully: {folder_path}") | |
| return | |
| def get_image_cdn_url(file_path: str) -> str: | |
| """获取图片CDN URL""" | |
| image_url = f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/{file_path}" | |
| logger.info(f"Getting CDN URL for: {image_url}") | |
| response = requests.head( | |
| image_url, | |
| allow_redirects=True, | |
| timeout=10, | |
| headers={ | |
| 'User-Agent': 'NekoAI', | |
| } | |
| ) | |
| image_cdn_url = response.url | |
| logger.info(f"CDN URL: {image_cdn_url}") | |
| return image_cdn_url | |
| def format_image_url(file_path: str) -> str: | |
| """格式化图片URL,用于显示 | |
| Args: | |
| file_path (str): 图片路径,可以是本地路径或HuggingFace路径 | |
| Returns: | |
| str: 格式化后的URL | |
| """ | |
| # 如果是本地文件,直接返回完整路径 | |
| if os.path.exists(file_path): | |
| return os.path.abspath(file_path) | |
| # 如果是HuggingFace路径,返回完整的URL | |
| return f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/{file_path}" | |
| def generate_temp_image(temp_dir: str, image: Image.Image, image_filename: str) -> str: | |
| """根据图片和文件名生成临时图片文件 | |
| Args: | |
| temp_dir (str): 临时目录路径 | |
| image (Image.Image): PIL图片对象 | |
| image_filename (str): 图片文件名,格式为 image_XXXXXX.png | |
| Returns: | |
| str: 临时图片文件的路径 | |
| Raises: | |
| ValueError: 如果临时目录不存在或无法访问 | |
| IOError: 如果保存图片失败 | |
| """ | |
| try: | |
| # 确保临时目录存在 | |
| if not os.path.exists(temp_dir): | |
| os.makedirs(temp_dir) | |
| logger.info(f"Created temporary directory: {temp_dir}") | |
| # 构建临时文件路径 | |
| temp_file_path = os.path.join(temp_dir, image_filename) | |
| # 保存图片到临时文件 | |
| image.save(temp_file_path, "PNG") | |
| logger.info(f"Generated temporary image: {temp_file_path}") | |
| return temp_file_path | |
| except OSError as e: | |
| logger.error(f"Failed to create temporary directory: {str(e)}") | |
| raise ValueError(f"Failed to create temporary directory: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Failed to generate temporary image: {str(e)}") | |
| raise IOError(f"Failed to generate temporary image: {str(e)}") |