Spaces:
Paused
Paused
| from flask import ( | |
| Flask, | |
| json, | |
| render_template, | |
| request, | |
| jsonify, | |
| redirect, | |
| url_for, | |
| send_file, | |
| Response | |
| ) | |
| from io import BytesIO | |
| import urllib.parse | |
| from functools import wraps | |
| import requests | |
| import hashlib | |
| import os | |
| from config import Config | |
| from models import Database | |
| from utils import get_file_type, format_file_size | |
| from huggingface_hub import HfApi | |
| # 初始化 HuggingFace API | |
| api = HfApi(token=Config.HF_TOKEN) | |
| app = Flask(__name__) | |
| app.config['SECRET_KEY'] = Config.SECRET_KEY | |
| db = Database() | |
| def require_auth(f): | |
| def decorated(*args, **kwargs): | |
| if not Config.REQUIRE_LOGIN: | |
| return f(*args, **kwargs) | |
| if not request.cookies.get('authenticated'): | |
| if request.is_json: | |
| return jsonify({'error': 'Unauthorized'}), 401 | |
| return redirect(url_for('login')) | |
| return f(*args, **kwargs) | |
| return decorated | |
| def login(): | |
| if request.method == 'POST': | |
| if request.form.get('password') == Config.ACCESS_PASSWORD: | |
| response = jsonify({'success': True}) | |
| response.set_cookie('authenticated', 'true', secure=True, httponly=True) | |
| return response | |
| return jsonify({'error': 'Invalid password'}), 401 | |
| return render_template('login.html') | |
| def logout(): | |
| response = redirect(url_for('login')) | |
| response.delete_cookie('authenticated') | |
| return response | |
| def index(): | |
| return render_template('index.html') | |
| def list_files(directory=''): | |
| try: | |
| url = f"https://huggingface.co/api/datasets/{Config.HF_DATASET_ID}/tree/{Config.HF_BRANCH}" | |
| if directory: | |
| url = f"{url}/{directory}" | |
| response = requests.get( | |
| url, | |
| headers={'Authorization': f'Bearer {Config.HF_TOKEN}'} | |
| ) | |
| if not response.ok: | |
| return jsonify({'error': 'Failed to fetch files', 'details': response.text}), response.status_code | |
| files = response.json() | |
| for file in files: | |
| if file['type'] == 'file': | |
| file['file_type'] = get_file_type(file['path']) | |
| file['size_formatted'] = format_file_size(file['size']) | |
| # 添加预览和下载URL | |
| file['preview_url'] = f"/api/files/preview/{file['path']}" | |
| file['download_url'] = f"/api/files/download/{file['path']}" | |
| return jsonify(files) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def preview_file(filepath): | |
| try: | |
| file_type = get_file_type(filepath) | |
| if file_type not in ['image', 'video', 'document']: | |
| return jsonify({'error': 'File type not supported for preview'}), 400 | |
| url = f"https://{Config.PROXY_DOMAIN}/datasets/{Config.HF_DATASET_ID}/resolve/{Config.HF_BRANCH}/{filepath}" | |
| response = requests.get( | |
| url, | |
| headers={'Authorization': f'Bearer {Config.HF_TOKEN}'}, | |
| stream=True | |
| ) | |
| if response.ok: | |
| def generate(): | |
| for chunk in response.iter_content(chunk_size=8192): | |
| yield chunk | |
| return Response( | |
| generate(), | |
| mimetype=response.headers.get('content-type', 'application/octet-stream'), | |
| direct_passthrough=True | |
| ) | |
| return jsonify({'error': 'Failed to fetch file'}), response.status_code | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def download_file(filepath): | |
| try: | |
| url = f"https://{Config.PROXY_DOMAIN}/datasets/{Config.HF_DATASET_ID}/resolve/{Config.HF_BRANCH}/{filepath}" | |
| # 先发送 HEAD 请求获取文件信息 | |
| head_response = requests.head( | |
| url, | |
| headers={'Authorization': f'Bearer {Config.HF_TOKEN}'}, | |
| allow_redirects=True | |
| ) | |
| if head_response.ok: | |
| # 获取文件基本信息 | |
| content_type = head_response.headers.get('content-type', 'application/octet-stream') | |
| content_length = head_response.headers.get('content-length') | |
| last_modified = head_response.headers.get('last-modified') | |
| etag = head_response.headers.get('etag') | |
| # 如果是txt文件但没有指定字符集,设置为text/plain | |
| if filepath.lower().endswith('.txt') and 'charset' not in content_type: | |
| content_type = 'text/plain' | |
| # 获取文件内容 | |
| response = requests.get( | |
| url, | |
| headers={'Authorization': f'Bearer {Config.HF_TOKEN}'}, | |
| stream=True | |
| ) | |
| if response.ok: | |
| filename = os.path.basename(filepath) | |
| encoded_filename = urllib.parse.quote(filename.encode('utf-8')) | |
| headers = { | |
| 'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}', | |
| 'Content-Type': content_type, | |
| 'Content-Length': content_length, | |
| 'Accept-Ranges': 'bytes', | |
| 'Cache-Control': 'no-cache', | |
| 'Last-Modified': last_modified, | |
| 'ETag': etag | |
| } | |
| # 移除为None的header | |
| headers = {k: v for k, v in headers.items() if v is not None} | |
| return Response( | |
| response.iter_content(chunk_size=1048576), | |
| headers=headers | |
| ) | |
| return jsonify({'error': 'File not found'}), 404 | |
| except Exception as e: | |
| print(f"Download error for {filepath}: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| def upload_file(): | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file provided'}), 400 | |
| file = request.files['file'] | |
| current_path = request.form.get('path', '').strip('/') | |
| try: | |
| file_content = file.read() | |
| file.seek(0) | |
| original_name = file.filename | |
| stored_name = original_name | |
| full_path = os.path.join(current_path, stored_name).replace("\\", "/") | |
| response = api.upload_file( | |
| path_or_fileobj=file_content, | |
| path_in_repo=full_path, | |
| repo_id=Config.HF_DATASET_ID, | |
| repo_type="dataset", | |
| token=Config.HF_TOKEN | |
| ) | |
| if response: | |
| with db.conn.cursor() as cursor: | |
| cursor.execute(""" | |
| INSERT INTO files ( | |
| original_name, stored_name, file_path, | |
| file_type, file_size | |
| ) VALUES (%s, %s, %s, %s, %s) | |
| """, ( | |
| original_name, | |
| stored_name, | |
| full_path, | |
| get_file_type(original_name), | |
| len(file_content) | |
| )) | |
| db.conn.commit() | |
| return jsonify({'success': True}) | |
| return jsonify({'error': 'Upload failed'}), 500 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def search_files(): | |
| keyword = request.args.get('keyword', '') | |
| if not keyword: | |
| return jsonify([]) | |
| try: | |
| files = db.search_files(keyword) | |
| return jsonify([{ | |
| 'type': 'file', | |
| 'path': f['file_path'], | |
| 'file_type': get_file_type(f['file_path']), | |
| 'size': f['file_size'], | |
| 'size_formatted': format_file_size(f['file_size']), | |
| 'preview_url': f'/api/files/preview/{f["file_path"]}', | |
| 'download_url': f'/api/files/download/{f["file_path"]}' | |
| } for f in files]) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def delete_file(filepath): | |
| try: | |
| # Initialize HuggingFace API | |
| api = HfApi(token=Config.HF_TOKEN) | |
| # Delete file from HuggingFace Hub | |
| api.delete_file( | |
| path_in_repo=filepath, | |
| repo_id=Config.HF_DATASET_ID, | |
| repo_type="dataset" | |
| ) | |
| # Delete file record from database | |
| with db.conn.cursor() as cursor: | |
| cursor.execute( | |
| "DELETE FROM files WHERE file_path = %s", | |
| [filepath] | |
| ) | |
| db.conn.commit() | |
| return jsonify({'success': True}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def create_folder(): | |
| try: | |
| data = request.json | |
| path = data.get('path', '/') | |
| name = data.get('name') | |
| if not name: | |
| return jsonify({'error': 'Folder name is required'}), 400 | |
| full_path = os.path.join(path, name, '.keep').replace("\\", "/") | |
| # 使用 HuggingFace API 创建文件夹 | |
| api = HfApi(token=Config.HF_TOKEN) | |
| response = api.upload_file( | |
| path_or_fileobj=b'', # 空内容 | |
| path_in_repo=full_path, | |
| repo_id=Config.HF_DATASET_ID, | |
| repo_type="dataset", | |
| token=Config.HF_TOKEN | |
| ) | |
| if response: | |
| # 记录到数据库 | |
| with db.conn.cursor() as cursor: | |
| cursor.execute(""" | |
| INSERT INTO files ( | |
| original_name, stored_name, file_path, | |
| file_type, file_size | |
| ) VALUES (%s, %s, %s, %s, %s) | |
| """, ( | |
| '.keep', | |
| '.keep', | |
| full_path, | |
| 'directory', | |
| 0 | |
| )) | |
| db.conn.commit() | |
| return jsonify({'message': 'Folder created successfully'}) | |
| return jsonify({'error': 'Failed to create folder'}), 500 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=5000, debug=True) |