import os import numpy as np import pandas as pd import json from typing import Dict, List, Optional, Union from openai import OpenAI from pathlib import Path from embedding_cache import EmbeddingCache class SDCClassifier: def __init__(self, openai_api_key: str = None, cache_path: str = "embeddings_cache.db", openai_model = None, # Модель OpenAI за замовчуванням local_model: str = "cambridgeltl/SapBERT-from-PubMedBERT-fulltext", device: str = None): """ Ініціалізація класифікатора SDC Args: openai_api_key: API ключ для OpenAI (опціонально, можна взяти з env) cache_path: шлях до файлу кешу ембедінгів local_model: назва або шлях до локальної моделі device: пристрій для локальної моделі ('cuda', 'cpu' або None) """ self.client = OpenAI(api_key=openai_api_key or os.getenv("OPENAI_API_KEY")) self.local_embedder = None self.using_local = False if local_model: from local_embedder import LocalEmbedder self.local_embedder = LocalEmbedder(local_model, device) self.using_local = True self.classes_json = {} self.class_signatures = None self.df = None self.embeddings = None self.embeddings_mean = None self.embeddings_std = None # Створення директорії для кешу cache_dir = os.path.dirname(cache_path) if cache_dir and not os.path.exists(cache_dir): os.makedirs(cache_dir) # Ініціалізація кешу self.cache = EmbeddingCache(cache_path) # Базовий стан self.base_classes_json = {} self.base_signatures = None def load_initial_state(self, classes_file: str, signatures_file: str) -> str: """ Завантаження початкового стану при старті застосунку Args: classes_file: шлях до файлу з класами signatures_file: шлях до файлу з signatures Returns: str: повідомлення про результат завантаження """ try: self.base_classes_json = self.load_classes(classes_file) if os.path.exists(signatures_file): self.base_signatures = self.load_signatures(signatures_file) # Встановлюємо поточний стан як базовий self.classes_json = self.base_classes_json.copy() self.class_signatures = self.base_signatures.copy() if self.base_signatures else None return f"Завантажено {len(self.base_classes_json)} базових класів" except Exception as e: return f"Помилка при завантаженні базового стану: {str(e)}" def restore_base_state(self) -> None: """Відновлення базового стану""" self.classes_json = self.base_classes_json.copy() self.class_signatures = self.base_signatures.copy() if self.base_signatures else None def load_classes(self, json_path: Union[str, dict]) -> dict: """ Завантаження класів та їх хінтів з JSON файлу або словника Args: json_path: шлях до JSON файлу або словник з класами Returns: dict: словник класів та їх хінтів """ try: if isinstance(json_path, dict): self.classes_json = json_path else: with open(json_path, 'r', encoding='utf-8') as f: self.classes_json = json.load(f) if not all(isinstance(hints, list) for hints in self.classes_json.values()): raise ValueError("Кожен клас повинен мати список хінтів") return self.classes_json except FileNotFoundError: print(f"Файл {json_path} не знайдено!") return {} except json.JSONDecodeError: print(f"Помилка читання JSON з файлу {json_path}!") return {} def save_signatures(self, filename: str = "signatures.npz") -> None: """ Зберігає signatures у NPZ файл Args: filename: шлях до файлу для збереження """ if self.class_signatures: np.savez(filename, **self.class_signatures) def load_signatures(self, filename: str = "signatures.npz") -> Dict[str, np.ndarray]: """ Завантажує signatures з NPZ файлу Args: filename: шлях до файлу з signatures Returns: Dict[str, np.ndarray]: словник signatures """ try: with np.load(filename) as data: self.class_signatures = {key: data[key] for key in data.files} return self.class_signatures except (FileNotFoundError, IOError): return None def set_openai_model(self, model_name: str) -> None: """ Встановлює модель OpenAI для використання Args: model_name: назва моделі OpenAI """ print(f"Встановлення OpenAI моделі: {model_name}") self.using_local = False self.local_embedder = None # Видаляємо локальний ембедер self.openai_model = model_name # Зберігаємо назву моделі def get_embedding(self, text: str, model_name: str = None) -> list: """ Отримання ембедінгу тексту Args: text: текст для ембедінгу model_name: назва моделі (OpenAI) або None для використання поточної Returns: list: ембедінг тексту """ # Перевіряємо кеш model_key = model_name or (self.openai_model if not self.using_local else "local") cached_embedding = self.cache.get(text, model_key) if cached_embedding is not None: return cached_embedding.tolist() # Отримуємо ембедінг if self.using_local: embedding = self.local_embedder.get_embeddings(text)[0] else: response = self.client.embeddings.create( input=text, model=model_name or self.openai_model or "text-embedding-3-large" ) embedding = response.data[0].embedding # Зберігаємо в кеш self.cache.put(text, model_key, embedding) return embedding def get_cache_stats(self) -> dict: """Отримання статистики кешування""" return self.cache.get_stats() def clear_old_cache(self, days: int = 30) -> int: """Очищення старих записів з кешу""" return self.cache.clear_old(days) def embed_hints(self, hint_list: List[str], model_name: str = None) -> np.ndarray: """ Створення ембедінгів для списку хінтів Args: hint_list: список хінтів model_name: назва моделі для ембедінгів Returns: np.ndarray: матриця ембедінгів """ emb_list = [] total_hints = len(hint_list) for idx, hint in enumerate(hint_list, 1): try: print(f" Отримання embedding {idx}/{total_hints}: '{hint}'") emb = self.get_embedding(hint, model_name=model_name) emb_list.append(emb) except Exception as e: print(f" Помилка при отриманні embedding для '{hint}': {str(e)}") continue if not emb_list: raise ValueError("Не вдалося отримати жодного embedding") return np.array(emb_list, dtype=np.float32) def initialize_signatures(self, model_name: str = None, signatures_file: str = "signatures.npz", force_rebuild: bool = False) -> str: """ Ініціалізує signatures: завантажує існуючі або створює нові Args: model_name: назва моделі для ембедінгів signatures_file: шлях до файлу для збереження (None - не зберігати) force_rebuild: примусово перебудувати signatures """ if not self.classes_json: return "Помилка: Не знайдено жодного класу в classes.json" print(f"Знайдено {len(self.classes_json)} класів") # Завантажуємо існуючі signatures if not force_rebuild and signatures_file and os.path.exists(signatures_file): try: loaded_signatures = self.load_signatures(signatures_file) if loaded_signatures and all(cls in loaded_signatures for cls in self.classes_json): self.class_signatures = loaded_signatures print("Успішно завантажено збережені signatures") return f"Завантажено існуючі signatures для {len(self.class_signatures)} класів" except Exception as e: print(f"Помилка при завантаженні signatures: {str(e)}") try: self.class_signatures = {} total_classes = len(self.classes_json) print(f"Починаємо створення нових signatures для {total_classes} класів...") for idx, (cls_name, hints) in enumerate(self.classes_json.items(), 1): if not hints: print(f"Пропускаємо клас {cls_name} - немає хінтів") continue print(f"Обробка класу {cls_name} ({idx}/{total_classes})...") try: # Отримуємо ембедінги для всіх хінтів класу arr = self.embed_hints(hints, model_name=model_name) # Нормалізуємо кожен ембедінг norms = np.linalg.norm(arr, axis=1, keepdims=True) arr = arr / norms # Обчислюємо середній нормалізований ембедінг self.class_signatures[cls_name] = np.mean(arr, axis=0) print(f"Успішно створено signature для {cls_name}") except Exception as e: print(f"Помилка при створенні signature для {cls_name}: {str(e)}") continue if not self.class_signatures: return "Помилка: Не вдалося створити жодного signature" # Зберігаємо signatures if signatures_file: try: self.save_signatures(signatures_file) print("Signatures збережено у файл") except Exception as e: print(f"Помилка при збереженні signatures: {str(e)}") return f"Створено нові signatures для {len(self.class_signatures)} класів" except Exception as e: return f"Помилка при створенні signatures: {str(e)}" def load_data(self, csv_path: str = "messages.csv", emb_path: str = "embeddings.npy") -> str: """ Завантаження даних з CSV та NPY файлів Args: csv_path: шлях до CSV файлу emb_path: шлях до NPY файлу з ембедінгами Returns: str: повідомлення про результат """ self.df = pd.read_csv(csv_path) emb_local = np.load(emb_path) assert len(self.df) == len(emb_local), "CSV і embeddings різної довжини!" self.df["Target"] = "Unlabeled" self.embeddings_mean = emb_local.mean(axis=0) self.embeddings_std = emb_local.std(axis=0) self.embeddings = (emb_local - self.embeddings_mean) / self.embeddings_std return f"Завантажено {len(self.df)} рядків" def predict_classes(self, text_embedding: np.ndarray, threshold: float = 0.0) -> Dict[str, float]: """ Передбачення класів для одного тексту Args: text_embedding: ембедінг тексту threshold: поріг впевненості Returns: Dict[str, float]: словник класів та їх scores """ results = {} for cls, sign in self.class_signatures.items(): score = float(np.dot(text_embedding, sign)) if score > threshold: results[cls] = score return dict(sorted(results.items(), key=lambda x: x[1], reverse=True)) def process_single_text(self, text: str, threshold: float = 0.3) -> dict: """ Обробка одного тексту Args: text: текст для класифікації threshold: поріг впевненості Returns: dict: результати класифікації """ if self.class_signatures is None: return {"error": "Спочатку збудуйте signatures!"} # Отримуємо ембедінг emb = np.array(self.get_embedding(text)) # Нормалізуємо відносно даних навчання, якщо вони доступні if self.embeddings_mean is not None and self.embeddings_std is not None and not self.using_local: emb = (emb - self.embeddings_mean) / self.embeddings_std # Отримуємо передбачення predictions = self.predict_classes(emb, threshold) if not predictions: return {"message": text, "result": "Жодного класу не знайдено"} # Форматуємо результати formatted_results = [] for cls, score in predictions.items(): # Конвертуємо score в відсотки, обмежуємо до 100% score_percent = min(abs(float(score)) * 100, 100) formatted_results.append(f"{cls}: {score_percent:.2f}%") return { "message": text, "result": " ".join(formatted_results) } def classify_rows(self, filter_substring: str = "", threshold: float = 0.3) -> pd.DataFrame: """ Класифікація всіх або відфільтрованих рядків Args: filter_substring: підрядок для фільтрації threshold: поріг впевненості Returns: pd.DataFrame: результати класифікації """ if self.class_signatures is None: raise ValueError("Спочатку збудуйте signatures!") if self.df is None or self.embeddings is None: raise ValueError("Дані не завантажені! Спочатку викличте load_data.") if filter_substring: filtered_idx = self.df[self.df["Message"].str.contains(filter_substring, case=False, na=False)].index else: filtered_idx = self.df.index for cls in self.class_signatures.keys(): self.df[f"Score_{cls}"] = 0.0 for i in filtered_idx: emb_vec = self.embeddings[i] predictions = self.predict_classes(emb_vec, threshold=threshold) for cls, score in predictions.items(): self.df.at[i, f"Score_{cls}"] = score main_classes = [cls for cls, score in predictions.items() if score > threshold] self.df.at[i, "Target"] = "|".join(main_classes) if main_classes else "None" result_columns = ["Message", "Target"] + [f"Score_{cls}" for cls in self.class_signatures.keys()] result_df = self.df.loc[filtered_idx, result_columns].copy() return result_df.reset_index(drop=True) def save_results(self, output_path: str = "messages_with_labels.csv") -> str: """ Зберігання результатів класифікації Args: output_path: шлях для збереження результатів Returns: str: повідомлення про результат """ if self.df is None: return "Дані відсутні!" self.df.to_csv(output_path, index=False) return f"Дані збережено у файл {output_path}" def save_model_info(self, path: str = "model_info.json") -> None: """ Зберігання інформації про поточний стан моделі Args: path: шлях для збереження """ info = { "using_local": self.using_local, "classes_count": len(self.classes_json), "signatures_count": len(self.class_signatures) if self.class_signatures else 0, "cache_stats": self.get_cache_stats(), } if self.using_local: info["local_model"] = self.local_embedder.get_model_info() with open(path, 'w', encoding='utf-8') as f: json.dump(info, f, indent=2) def evaluate_classification(self, csv_path: str, threshold: float = 0.3) -> tuple[pd.DataFrame, dict]: """ Оцінка класифікації текстів з CSV файлу Args: csv_path: шлях до CSV файлу з колонками Category та Question threshold: поріг впевненості для класифікації Returns: tuple[pd.DataFrame, dict]: результати класифікації та статистика """ if self.class_signatures is None: raise ValueError("Спочатку збудуйте signatures!") # Завантаження даних print(f"\nЗавантаження даних з {csv_path}...") df = pd.read_csv(csv_path) if not {'Category', 'Question'}.issubset(df.columns): raise ValueError("CSV повинен містити колонки 'Category' та 'Question'") # Підготовка результатів results = [] total = len(df) print(f"Знайдено {total} рядків для класифікації") print(f"Використовується {'OpenAI' if not self.using_local else 'локальна'} модель") for idx, row in df.iterrows(): if idx % 10 == 0: # Логуємо прогрес кожні 10 рядків print(f"Обробка рядка {idx + 1}/{total}") try: # Отримуємо ембедінг для питання emb = np.array(self.get_embedding(row['Question'])) # Нормалізуємо ембедінг emb_norm = np.linalg.norm(emb) if emb_norm > 0: emb = emb / emb_norm # Отримуємо всі передбачення predictions = self.predict_classes(emb, threshold) # Формуємо список класів за рівнем впевненості sorted_classes = list(predictions.keys()) # Знаходимо позицію очікуваного класу expected_class = row['Category'] expected_position = sorted_classes.index(expected_class) + 1 if expected_class in sorted_classes else -1 # Отримуємо рівень впевненості для очікуваного класу expected_confidence = predictions.get(expected_class, 0.0) # Додаємо результат results.append({ 'Category': row['Category'], 'Question': row['Question'], 'ExpectedClassPosition': expected_position, 'ExpectedClassConfidence': expected_confidence, 'ClassificationResults': json.dumps(predictions, ensure_ascii=False) }) except Exception as e: print(f"Помилка при обробці рядка {idx + 1}: {str(e)}") results.append({ 'Category': row['Category'], 'Question': row['Question'], 'ExpectedClassPosition': -1, 'ExpectedClassConfidence': 0.0, 'ClassificationResults': json.dumps({}) }) print("\nОбробка завершена") results_df = pd.DataFrame(results) statistics = self.get_evaluation_statistics(results_df) return results_df, statistics def save_evaluation_results(self, df: pd.DataFrame, output_path: str = "evaluation_results.csv") -> str: """ Зберігає результати оцінки класифікації Args: df: DataFrame з результатами output_path: шлях для збереження файлу Returns: str: повідомлення про результат """ try: df.to_csv(output_path, index=False) return f"Результати збережено у файл {output_path}" except Exception as e: return f"Помилка при збереженні результатів: {str(e)}" def get_evaluation_statistics(self, df: pd.DataFrame) -> dict: """ Розраховує статистику по результатам класифікації Args: df: DataFrame з результатами класифікації Returns: dict: статистика класифікації """ total = len(df) found_mask = df['ExpectedClassPosition'] != -1 correct_first = (df['ExpectedClassPosition'] == 1).sum() in_top3 = (df['ExpectedClassPosition'].between(1, 3)).sum() not_found = (~found_mask).sum() # Середня впевненість для коректних класифікацій mean_confidence = df[df['ExpectedClassPosition'] == 1]['ExpectedClassConfidence'].mean() # Підрахунок по діапазонах впевненості confidence_ranges = { "90-100%": ((df['ExpectedClassConfidence'] >= 0.9) & found_mask).sum(), "70-90%": ((df['ExpectedClassConfidence'].between(0.7, 0.9)) & found_mask).sum(), "50-70%": ((df['ExpectedClassConfidence'].between(0.5, 0.7)) & found_mask).sum(), "<50%": ((df['ExpectedClassConfidence'] < 0.5) & found_mask).sum() } return { "total_samples": total, "correct_first_place": { "count": int(correct_first), "percentage": round(correct_first/total * 100, 1) }, "in_top3": { "count": int(in_top3), "percentage": round(in_top3/total * 100, 1) }, "not_found": { "count": int(not_found), "percentage": round(not_found/total * 100, 1) }, "mean_confidence_correct": round(mean_confidence * 100, 1) if not np.isnan(mean_confidence) else 0, "confidence_distribution": { k: { "count": int(v), "percentage": round(v/total * 100, 1) } for k, v in confidence_ranges.items() } } def evaluate_classification(self, csv_path: str, threshold: float = 0.3) -> tuple[pd.DataFrame, dict]: """ Оцінка класифікації текстів з CSV файлу Args: csv_path: шлях до CSV файлу з колонками Category та Question threshold: поріг впевненості для класифікації Returns: tuple[pd.DataFrame, dict]: результати класифікації та статистика """ if self.class_signatures is None: raise ValueError("Спочатку збудуйте signatures!") # Завантаження даних df = pd.read_csv(csv_path) if not {'Category', 'Question'}.issubset(df.columns): raise ValueError("CSV повинен містити колонки 'Category' та 'Question'") # Підготовка результатів results = [] for idx, row in df.iterrows(): # Отримуємо ембедінг для питання emb = np.array(self.get_embedding(row['Question'])) # Нормалізуємо якщо потрібно if self.embeddings_mean is not None and self.embeddings_std is not None and not self.using_local: emb = (emb - self.embeddings_mean) / self.embeddings_std # Отримуємо всі передбачення predictions = self.predict_classes(emb, threshold) # Формуємо список класів за рівнем впевненості sorted_classes = list(predictions.keys()) # Знаходимо позицію очікуваного класу expected_class = row['Category'] expected_position = sorted_classes.index(expected_class) + 1 if expected_class in sorted_classes else -1 # Отримуємо рівень впевненості для очікуваного класу expected_confidence = predictions.get(expected_class, 0.0) # Додаємо результат results.append({ 'Category': row['Category'], 'Question': row['Question'], 'ExpectedClassPosition': expected_position, 'ExpectedClassConfidence': expected_confidence, 'ClassificationResults': json.dumps(predictions) }) results_df = pd.DataFrame(results) statistics = self.get_evaluation_statistics(results_df) return results_df, statistics @staticmethod def load_model_info(path: str) -> dict: """ Завантаження інформації про модель Args: path: шлях до файлу з інформацією Returns: dict: інформація про модель """ with open(path, 'r', encoding='utf-8') as f: return json.load(f)