"""Profile service for managing user profiles and data.""" import json import os from datetime import datetime from typing import Any, Dict, Optional from pydantic import BaseModel, Field from ..config import get_settings class UserProfile(BaseModel): """User profile data model.""" user_id: str resume: str skills: list[str] salary_wish: str career_goals: str experience_level: Optional[str] = None location: Optional[str] = None education: Optional[str] = None certifications: Optional[list[str]] = None created_at: datetime = Field(default_factory=datetime.now) updated_at: datetime = Field(default_factory=datetime.now) class ProfileService: """Service for managing user profiles.""" def __init__(self): self.settings = get_settings() self.profiles_path = self.settings.profiles_db_path self._ensure_data_dir() def _ensure_data_dir(self): """Ensure data directory exists.""" os.makedirs(os.path.dirname(self.profiles_path), exist_ok=True) def _load_profiles(self) -> Dict[str, Dict[str, Any]]: """Load profiles from file.""" if not os.path.exists(self.profiles_path): return {} try: with open(self.profiles_path, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, FileNotFoundError): return {} def _save_profiles(self, profiles: Dict[str, Dict[str, Any]]): """Save profiles to file.""" with open(self.profiles_path, "w", encoding="utf-8") as f: json.dump(profiles, f, indent=2, default=str) def upsert_profile(self, user_id: str, profile_data: str) -> Dict[str, Any]: """ Store or update user profile. Args: user_id: Unique user identifier profile_data: JSON string containing profile information Returns: Dict with operation result """ try: # Parse profile data profile_dict = json.loads(profile_data) # Validate required fields required_fields = ["resume", "skills", "salary_wish", "career_goals"] missing_fields = [ field for field in required_fields if field not in profile_dict ] if missing_fields: return { "success": False, "message": f"Missing required fields: {', '.join(missing_fields)}", } # Create profile model profile_dict["user_id"] = user_id profile = UserProfile(**profile_dict) # Load existing profiles profiles = self._load_profiles() # Update timestamp if profile exists if user_id in profiles: profile.updated_at = datetime.now() # Store profile profiles[user_id] = profile.dict() self._save_profiles(profiles) return { "success": True, "message": "Profile updated successfully", "user_id": user_id, "profile": { "skills_count": len(profile.skills), "updated_at": profile.updated_at.isoformat(), }, } except json.JSONDecodeError: return {"success": False, "message": "Invalid JSON format in profile data"} except Exception as e: return {"success": False, "message": f"Error updating profile: {str(e)}"} def get_profile(self, user_id: str) -> Optional[UserProfile]: """Get user profile by ID.""" profiles = self._load_profiles() profile_data = profiles.get(user_id) if profile_data: return UserProfile(**profile_data) return None def delete_profile(self, user_id: str) -> bool: """Delete user profile.""" profiles = self._load_profiles() if user_id in profiles: del profiles[user_id] self._save_profiles(profiles) return True return False def list_profiles(self) -> list[str]: """List all user IDs.""" profiles = self._load_profiles() return list(profiles.keys())