jobsearch-mcp-server / src /services /embedding_service.py
daniielyan's picture
πŸ”¨ Implement core functionality for Job Search MCP Server, including user profile management, job search, cover letter generation, and Q&A response tools. Add configuration and service layers, and establish dependency management with uv. Introduce .gitignore and .python-version files for environment setup.
4fd18a2
"""Embedding service for text embeddings and similarity search using GPU."""
import json
import os
from typing import Any, Dict, List, Optional, Tuple
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from ..config import get_settings
class EmbeddingService:
"""Service for text embeddings and similarity search."""
def __init__(self):
self.settings = get_settings()
self.model = None
self.index = None
self.job_metadata = {}
self._load_model()
self._load_index()
def _load_model(self):
"""Load the sentence transformer model."""
try:
self.model = SentenceTransformer(self.settings.embedding_model)
# Use GPU if available
if hasattr(self.model, "to"):
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
self.model = self.model.to(device)
print(f"Embedding model loaded on: {device}")
except Exception as e:
print(f"Error loading embedding model: {e}")
self.model = None
def _load_index(self):
"""Load or create FAISS index."""
try:
if os.path.exists(self.settings.embeddings_cache_path):
self.index = faiss.read_index(self.settings.embeddings_cache_path)
# Load metadata
metadata_path = self.settings.embeddings_cache_path.replace(
".faiss", "_metadata.json"
)
if os.path.exists(metadata_path):
with open(metadata_path, "r", encoding="utf-8") as f:
self.job_metadata = json.load(f)
print(f"Loaded FAISS index with {self.index.ntotal} vectors")
else:
# Create new index
self.index = faiss.IndexFlatIP(self.settings.embedding_dimension)
print("Created new FAISS index")
except Exception as e:
print(f"Error loading FAISS index: {e}")
self.index = faiss.IndexFlatIP(self.settings.embedding_dimension)
def _save_index(self):
"""Save FAISS index and metadata."""
try:
os.makedirs(
os.path.dirname(self.settings.embeddings_cache_path), exist_ok=True
)
faiss.write_index(self.index, self.settings.embeddings_cache_path)
# Save metadata
metadata_path = self.settings.embeddings_cache_path.replace(
".faiss", "_metadata.json"
)
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(self.job_metadata, f, indent=2, default=str)
print(f"Saved FAISS index with {self.index.ntotal} vectors")
except Exception as e:
print(f"Error saving FAISS index: {e}")
def get_embedding(self, text: str) -> Optional[np.ndarray]:
"""Get embedding for a single text."""
if not self.model:
return None
try:
embedding = self.model.encode([text])
# Normalize for cosine similarity (using Inner Product index)
embedding = embedding / np.linalg.norm(embedding, axis=1, keepdims=True)
return embedding[0]
except Exception as e:
print(f"Error generating embedding: {e}")
return None
def get_embeddings(self, texts: List[str]) -> Optional[np.ndarray]:
"""Get embeddings for multiple texts."""
if not self.model:
return None
try:
embeddings = self.model.encode(texts, show_progress_bar=True)
# Normalize for cosine similarity
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
return embeddings
except Exception as e:
print(f"Error generating embeddings: {e}")
return None
def add_job_embeddings(self, jobs: List[Dict[str, Any]]):
"""Add job embeddings to the index."""
if not jobs:
return
# Prepare texts for embedding
texts = []
job_ids = []
for job in jobs:
# Combine job title, description, and requirements for embedding
job_text = f"{job.get('title', '')} {job.get('description', '')} {job.get('requirements', '')}"
texts.append(job_text)
job_ids.append(job.get("id", len(self.job_metadata)))
# Get embeddings
embeddings = self.get_embeddings(texts)
if embeddings is None:
return
# Add to index
self.index.add(embeddings.astype("float32"))
# Store metadata
for i, job in enumerate(jobs):
job_id = job_ids[i]
self.job_metadata[str(len(self.job_metadata))] = {
"job_id": job_id,
"title": job.get("title", ""),
"company": job.get("company", ""),
"location": job.get("location", ""),
"salary": job.get("salary", ""),
"url": job.get("url", ""),
"posted_date": job.get("posted_date", ""),
"job_type": job.get("job_type", ""),
"description": job.get("description", "")[:500], # Truncate for storage
}
self._save_index()
print(f"Added {len(jobs)} job embeddings to index")
def search_similar_jobs(
self, profile_text: str, k: int = 20
) -> List[Tuple[Dict[str, Any], float]]:
"""
Search for jobs similar to user profile.
Args:
profile_text: Combined user profile text for matching
k: Number of top results to return
Returns:
List of tuples (job_metadata, similarity_score)
"""
if not self.index or self.index.ntotal == 0:
return []
# Get profile embedding
profile_embedding = self.get_embedding(profile_text)
if profile_embedding is None:
return []
# Search index
try:
scores, indices = self.index.search(
profile_embedding[np.newaxis].astype("float32"),
min(k, self.index.ntotal),
)
results = []
for i, (score, idx) in enumerate(zip(scores[0], indices[0])):
if idx == -1: # Invalid index
continue
job_meta = self.job_metadata.get(str(idx))
if job_meta:
results.append((job_meta, float(score)))
return results
except Exception as e:
print(f"Error searching similar jobs: {e}")
return []
def clear_index(self):
"""Clear the index and metadata."""
self.index = faiss.IndexFlatIP(self.settings.embedding_dimension)
self.job_metadata = {}
self._save_index()
print("Cleared job embeddings index")
def get_index_stats(self) -> Dict[str, Any]:
"""Get statistics about the index."""
return {
"total_jobs": self.index.ntotal if self.index else 0,
"embedding_dimension": self.settings.embedding_dimension,
"model_name": self.settings.embedding_model,
"metadata_count": len(self.job_metadata),
}