import os import json import tempfile import time from typing import Dict, List, Optional, Any from google.oauth2 import service_account from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from googleapiclient.errors import HttpError import streamlit as st import ssl def retry_on_ssl_error(max_retries=3, delay=1): """Decorator to retry functions on SSL errors""" def decorator(func): def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except (ssl.SSLError, ConnectionError, OSError) as e: if attempt == max_retries - 1: print(f"SSL/Connection error after {max_retries} attempts: {e}") raise print(f"SSL/Connection error (attempt {attempt + 1}/{max_retries}): {e}") time.sleep(delay * (attempt + 1)) # Exponential backoff except Exception as e: # Don't retry on other types of errors raise return None return wrapper return decorator class GoogleDriveManager: def __init__(self): self.service = None self.folder_id = None self.is_huggingface = os.getenv('SPACE_ID') is not None self.temp_dir = "/tmp/wedding_data" if self.is_huggingface else "temp_data" # Ensure temp directory exists os.makedirs(self.temp_dir, exist_ok=True) def initialize(self, folder_id: str = None): """Initialize Google Drive service and set folder ID""" try: if self.is_huggingface: self._setup_huggingface_auth() else: self._setup_local_auth() if folder_id: self.folder_id = folder_id else: # Try to get folder ID from environment self.folder_id = os.getenv('GOOGLE_DRIVE_FOLDER_ID') if not self.folder_id: st.error("Google Drive folder ID not found. Please set GOOGLE_DRIVE_FOLDER_ID environment variable.") return False return True except Exception as e: st.error(f"Failed to initialize Google Drive: {str(e)}") return False def _setup_huggingface_auth(self): """Set up authentication for Hugging Face Spaces""" # For Hugging Face, we'll use service account credentials # stored as environment variables service_account_info = { "type": "service_account", "project_id": os.getenv('GOOGLE_PROJECT_ID'), "private_key_id": os.getenv('GOOGLE_PRIVATE_KEY_ID'), "private_key": os.getenv('GOOGLE_PRIVATE_KEY', '').replace('\\n', '\n'), "client_email": os.getenv('GOOGLE_CLIENT_EMAIL'), "client_id": os.getenv('GOOGLE_CLIENT_ID'), "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": f"https://www.googleapis.com/robot/v1/metadata/x509/{os.getenv('GOOGLE_CLIENT_EMAIL')}" } # Validate that all required fields are present required_fields = ['project_id', 'private_key_id', 'private_key', 'client_email', 'client_id'] missing_fields = [field for field in required_fields if not service_account_info.get(field)] if missing_fields: raise ValueError(f"Missing Google service account credentials: {missing_fields}") credentials = service_account.Credentials.from_service_account_info( service_account_info, scopes=['https://www.googleapis.com/auth/drive'] ) self.service = build('drive', 'v3', credentials=credentials) def _setup_local_auth(self): """Set up authentication for local development""" # For local development, you can use OAuth or service account # This is a simplified version - you might want to implement OAuth flow service_account_path = os.getenv('GOOGLE_SERVICE_ACCOUNT_PATH') if service_account_path and os.path.exists(service_account_path): credentials = service_account.Credentials.from_service_account_file( service_account_path, scopes=['https://www.googleapis.com/auth/drive'] ) self.service = build('drive', 'v3', credentials=credentials) else: st.warning("Google service account file not found. Using local data only.") self.service = None @retry_on_ssl_error(max_retries=3, delay=1) def list_files(self) -> List[Dict[str, Any]]: """List all files in the Google Drive folder""" if not self.service or not self.folder_id: return [] try: query = f"'{self.folder_id}' in parents and trashed=false" results = self.service.files().list( q=query, fields="files(id, name, modifiedTime, size)" ).execute() return results.get('files', []) except HttpError as e: st.error(f"Error listing files: {str(e)}") return [] @retry_on_ssl_error(max_retries=3, delay=1) def download_file(self, file_name: str) -> Optional[Dict[str, Any]]: """Download a file from Google Drive and return its content""" if not self.service or not self.folder_id: return None try: # Handle subfolder paths like 'laraandumang/wedding_config.json' if '/' in file_name: folder_name, actual_file_name = file_name.split('/', 1) # First, find the subfolder folder_query = f"name='{folder_name}' and '{self.folder_id}' in parents and trashed=false and mimeType='application/vnd.google-apps.folder'" folder_results = self.service.files().list(q=folder_query).execute() folders = folder_results.get('files', []) if not folders: st.warning(f"File '{file_name}' not found in Google Drive") return None folder_id = folders[0]['id'] # Now search for the file within that folder file_query = f"name='{actual_file_name}' and '{folder_id}' in parents and trashed=false" results = self.service.files().list(q=file_query).execute() files = results.get('files', []) if not files: st.warning(f"File '{file_name}' not found in Google Drive") return None else: # Direct file search in root folder query = f"name='{file_name}' and '{self.folder_id}' in parents and trashed=false" results = self.service.files().list(q=query).execute() files = results.get('files', []) if not files: st.warning(f"File '{file_name}' not found in Google Drive") return None file_id = files[0]['id'] # Download file content request = self.service.files().get_media(fileId=file_id) content = request.execute() # Try to parse as JSON try: return json.loads(content.decode('utf-8')) except json.JSONDecodeError: # If not JSON, return as string return content.decode('utf-8') except HttpError as e: st.error(f"Error downloading file '{file_name}': {str(e)}") return None @retry_on_ssl_error(max_retries=3, delay=1) def upload_file(self, file_name: str, content: Any) -> bool: """Upload a file to Google Drive""" if not self.service or not self.folder_id: return False try: # Convert content to appropriate string format if isinstance(content, (dict, list)): if file_name.endswith('.yaml') or file_name.endswith('.yml'): # For YAML files, convert to YAML string import yaml content_str = yaml.dump(content, default_flow_style=False, sort_keys=False) else: # For JSON files, convert to JSON string content_str = json.dumps(content, indent=2) else: content_str = str(content) # Convert string to bytes content_bytes = content_str.encode('utf-8') # Create a temporary file-like object from io import BytesIO media_body = BytesIO(content_bytes) # Determine MIME type based on file extension if file_name.endswith('.yaml') or file_name.endswith('.yml'): mimetype = 'text/yaml' else: mimetype = 'application/json' # Handle subfolder paths like 'laraandumang/wedding_config.json' target_folder_id = self.folder_id actual_file_name = file_name if '/' in file_name: folder_name, actual_file_name = file_name.split('/', 1) # First, find the subfolder folder_query = f"name='{folder_name}' and '{self.folder_id}' in parents and trashed=false and mimeType='application/vnd.google-apps.folder'" folder_results = self.service.files().list(q=folder_query).execute() folders = folder_results.get('files', []) if not folders: # Create the subfolder if it doesn't exist folder_metadata = { 'name': folder_name, 'mimeType': 'application/vnd.google-apps.folder', 'parents': [self.folder_id] } created_folder = self.service.files().create( body=folder_metadata, fields='id' ).execute() target_folder_id = created_folder.get('id') else: target_folder_id = folders[0]['id'] # Check if file already exists in the target folder query = f"name='{actual_file_name}' and '{target_folder_id}' in parents and trashed=false" results = self.service.files().list(q=query).execute() existing_files = results.get('files', []) if existing_files: # Update existing file file_id = existing_files[0]['id'] from googleapiclient.http import MediaIoBaseUpload media = MediaIoBaseUpload(media_body, mimetype=mimetype, resumable=True) self.service.files().update( fileId=file_id, media_body=media ).execute() else: # Create new file file_metadata = { 'name': actual_file_name, 'parents': [target_folder_id] } from googleapiclient.http import MediaIoBaseUpload media = MediaIoBaseUpload(media_body, mimetype=mimetype, resumable=True) self.service.files().create( body=file_metadata, media_body=media ).execute() return True except HttpError as e: st.error(f"Error uploading file '{file_name}': {str(e)}") return False except Exception as e: st.error(f"Unexpected error uploading file '{file_name}': {str(e)}") return False def sync_from_drive(self, file_names: List[str]) -> Dict[str, Any]: """Download multiple files from Google Drive""" synced_files = {} for file_name in file_names: content = self.download_file(file_name) if content is not None: synced_files[file_name] = content # Save to local temp directory local_path = os.path.join(self.temp_dir, file_name) with open(local_path, 'w') as f: if isinstance(content, (dict, list)): json.dump(content, f, indent=2) else: f.write(str(content)) return synced_files def sync_to_drive(self, file_names: List[str], local_data: Dict[str, Any]) -> bool: """Upload multiple files to Google Drive""" success = True for file_name in file_names: if file_name in local_data: if not self.upload_file(file_name, local_data[file_name]): success = False return success def get_file_info(self, file_name: str) -> Optional[Dict[str, Any]]: """Get metadata for a specific file""" if not self.service or not self.folder_id: return None try: query = f"name='{file_name}' and '{self.folder_id}' in parents and trashed=false" results = self.service.files().list(q=query).execute() files = results.get('files', []) if files: return files[0] return None except HttpError as e: st.error(f"Error getting file info for '{file_name}': {str(e)}") return None def is_online(self) -> bool: """Check if Google Drive service is available""" return self.service is not None and self.folder_id is not None