Spaces:
Running
Running
| import os | |
| import torch | |
| import pandas as pd | |
| from datasets import Dataset | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import mean_squared_error, r2_score | |
| from transformers import AutoTokenizer, Trainer, TrainingArguments, IntervalStrategy | |
| import re | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from transformers import AutoModel, AutoConfig, AutoTokenizer, Trainer, TrainingArguments, IntervalStrategy | |
| from nltk.corpus import stopwords | |
| import spacy | |
| class TinyFinBERTRegressor(nn.Module): | |
| def __init__(self, pretrained_model='huawei-noah/TinyBERT_General_4L_312D'): | |
| super().__init__() | |
| if pretrained_model: | |
| self.config = AutoConfig.from_pretrained(pretrained_model) | |
| self.bert = AutoModel.from_pretrained(pretrained_model, config=self.config) | |
| else: | |
| self.config = AutoConfig() | |
| self.bert = AutoModel(self.config) | |
| self.regressor = nn.Linear(self.config.hidden_size, 1) | |
| # Manually register the position_ids buffer to avoid missing key error | |
| self.bert.embeddings.register_buffer( | |
| "position_ids", | |
| torch.arange(512).expand((1, -1)), | |
| persistent=False, | |
| ) | |
| def forward(self, input_ids=None, attention_mask=None, labels=None): | |
| outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask) | |
| cls_output = outputs.last_hidden_state[:, 0] | |
| score = self.regressor(cls_output).squeeze() | |
| loss = F.mse_loss(score, labels) if labels is not None else None | |
| return {'loss': loss, 'score': score} | |
| def preprocess_texts(texts): | |
| nlp = spacy.load("en_core_web_sm", disable=["ner", "parser"]) # Speeds up processing | |
| negations = {'no', 'not', 'none', 'nobody', 'nothing', 'neither', 'nowhere', 'never', | |
| 'hardly', 'scarcely', 'barely', "n't", "without", "unless", "nor"} | |
| stop_words = set(stopwords.words('english')) - negations | |
| processed = [] | |
| for text in texts: | |
| text = text.lower() | |
| text = re.sub(r'[^a-zA-Z\s]', '', text) | |
| doc = nlp(text) | |
| tokens = [ | |
| token.lemma_ for token in doc | |
| if token.lemma_.strip() # token.lemma_ not in stop_words and | |
| ] | |
| processed.append(' '.join(tokens)) | |
| return processed | |
| def load_phrasebank(path): | |
| with open(path, 'r', encoding='latin1') as f: | |
| lines = f.readlines() | |
| sents, scores = [], [] | |
| for line in lines: | |
| if '@' in line: | |
| s, l = line.strip().split('@') | |
| score = 0.0 if l.lower() == 'neutral' else (-1.0 if l.lower() == 'negative' else 1.0) | |
| sents.append(s) | |
| scores.append(score) | |
| return pd.DataFrame({'text': sents, 'score': scores}) | |
| def load_words_phrases(path): | |
| with open(path, 'r', encoding='latin1') as f: | |
| lines = f.readlines() | |
| data = [] | |
| for line in lines: | |
| line = line.strip() | |
| match = re.search(r',(-?\d+\.?\d*)$', line) | |
| if match: | |
| text = line[:match.start()].strip() | |
| score = float(match.group(1)) | |
| data.append((text, score)) | |
| return pd.DataFrame(data, columns=["text", "score"]) | |
| def train_model(phrase_path, words_path, save_path): | |
| os.makedirs(save_path, exist_ok=True) | |
| # Set device | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"Using device: {device}") | |
| phrase_df = load_phrasebank(phrase_path) | |
| words_df = load_words_phrases(words_path) | |
| phrase_df['text'] = preprocess_texts(phrase_df['text']) | |
| words_df['text'] = preprocess_texts(words_df['text']) | |
| train_phrase, test_phrase = train_test_split(phrase_df, test_size=0.2, random_state=42) | |
| train_df = pd.concat([train_phrase, words_df]) | |
| test_df = test_phrase.reset_index(drop=True) | |
| tokenizer = AutoTokenizer.from_pretrained('huawei-noah/TinyBERT_General_4L_312D') | |
| def tokenize(batch): | |
| tokens = tokenizer(batch["text"], padding='max_length', truncation=True, max_length=128) | |
| tokens["labels"] = batch["score"] | |
| return tokens | |
| train_dataset = Dataset.from_pandas(train_df).map(tokenize, batched=True) | |
| test_dataset = Dataset.from_pandas(test_df).map(tokenize, batched=True) | |
| args = TrainingArguments( | |
| output_dir=os.path.join(save_path, "results"), | |
| evaluation_strategy=IntervalStrategy.EPOCH, | |
| save_strategy=IntervalStrategy.EPOCH, | |
| learning_rate=2e-5, | |
| per_device_train_batch_size=16, | |
| per_device_eval_batch_size=64, | |
| num_train_epochs=5, | |
| weight_decay=0.01, | |
| load_best_model_at_end=True, | |
| metric_for_best_model="eval_loss" | |
| ) | |
| model = TinyFinBERTRegressor().to(device) | |
| trainer = Trainer( | |
| model=model, | |
| args=args, | |
| train_dataset=train_dataset, | |
| eval_dataset=test_dataset, | |
| tokenizer=tokenizer, | |
| compute_metrics=lambda pred: { | |
| "mse": mean_squared_error(pred.label_ids, pred.predictions), | |
| "r2": r2_score(pred.label_ids, pred.predictions) | |
| } | |
| ) | |
| trainer.train() | |
| # Save the model and tokenizer | |
| model_to_save = model.module if hasattr(model, 'module') else model # Handle distributed/parallel training | |
| torch.save(model_to_save.state_dict(), os.path.join(save_path, "regressor_model.pt")) | |
| tokenizer.save_pretrained(save_path) | |
| print(f"Model saved to {save_path}") | |
| from sklearn.metrics import ( | |
| mean_squared_error, r2_score, | |
| accuracy_score, precision_score, recall_score, f1_score, | |
| roc_auc_score, confusion_matrix, cohen_kappa_score | |
| ) | |
| from sklearn.preprocessing import label_binarize | |
| def evaluate_model(phrase_path, model_path): | |
| # Set device | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"Using device: {device}") | |
| phrase_df = load_phrasebank(phrase_path) | |
| _, test_df = train_test_split(phrase_df, test_size=0.2, random_state=42) | |
| test_df['text'] = preprocess_texts(test_df['text']) | |
| tokenizer = AutoTokenizer.from_pretrained(model_path) | |
| model = TinyFinBERTRegressor() | |
| model.load_state_dict(torch.load(os.path.join(model_path, "regressor_model.pt"), map_location=device)) | |
| model.to(device) | |
| model.eval() | |
| y_true, y_pred, y_scores = [], [], [] | |
| for _, row in test_df.iterrows(): | |
| inputs = tokenizer(row["text"], return_tensors="pt", truncation=True, padding='max_length', max_length=128) | |
| inputs = {k: v.to(device) for k, v in inputs.items() if k != "token_type_ids"} | |
| with torch.no_grad(): | |
| score = model(**inputs)["score"].item() | |
| y_scores.append(score) | |
| y_true.append(row["score"]) | |
| # regression metrics | |
| mse = mean_squared_error(y_true, y_scores) | |
| r2 = r2_score(y_true, y_scores) | |
| y_pred = [1 if s > 0.3 else -1 if s < -0.3 else 0 for s in y_scores] | |
| y_true_classes = [int(round(s)) for s in y_true] | |
| acc = accuracy_score(y_true_classes, y_pred) | |
| prec = precision_score(y_true_classes, y_pred, average='weighted', zero_division=0) | |
| rec = recall_score(y_true_classes, y_pred, average='weighted') | |
| f1 = f1_score(y_true_classes, y_pred, average='weighted') | |
| kappa = cohen_kappa_score(y_true_classes, y_pred) | |
| cm = confusion_matrix(y_true_classes, y_pred) | |
| y_true_bin = label_binarize(y_true_classes, classes=[-1, 0, 1]) | |
| y_score_bin = label_binarize(y_pred, classes=[-1, 0, 1]) | |
| roc_auc = roc_auc_score(y_true_bin, y_score_bin, average='macro', multi_class='ovo') | |
| print(f"Sentiment Regression Metrics:") | |
| print(f"- MSE: {mse:.4f}") | |
| print(f"- R²: {r2:.4f}") | |
| print(f"- Accuracy: {acc:.4f}") | |
| print(f"- Precision: {prec:.4f}") | |
| print(f"- Recall: {rec:.4f}") | |
| print(f"- F1 Score: {f1:.4f}") | |
| print(f"- ROC-AUC: {roc_auc:.4f}") | |
| print(f"- Cohen's Kappa: {kappa:.4f}") | |
| print(f"- Confusion Matrix:\n{cm}") | |
| def test(model_path): | |
| # Set device | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"Using device: {device}") | |
| tokenizer = AutoTokenizer.from_pretrained(model_path) | |
| model = TinyFinBERTRegressor() | |
| model.load_state_dict(torch.load(os.path.join(model_path, "regressor_model.pt"), map_location=device)) | |
| model.to(device) | |
| model.eval() | |
| texts = [ | |
| "The company's earnings exceeded expectations.", | |
| "They faced major losses this quarter.", | |
| "They didn't face major losses this quarter.", | |
| "Stock prices remained the same.", | |
| "boost", | |
| "strong boost", | |
| "AMD was not able to reduce losses.", | |
| "AMD reduced debt significantly, improves balance sheet", | |
| "Economic indicators point to contraction in telecom sector", | |
| "Company didn't have increased losses over last years." | |
| ] | |
| for text in texts: | |
| clean_text = preprocess_texts([text])[0] | |
| print(f"Original Text: {text}") | |
| print(f"Processed Text: {clean_text}") | |
| tokens = tokenizer.tokenize(clean_text) | |
| print(f"Tokens: {tokens}") | |
| inputs = tokenizer(clean_text, return_tensors="pt", truncation=True, padding='max_length', max_length=128) | |
| inputs = {k: v.to(device) for k, v in inputs.items() if k != "token_type_ids"} | |
| with torch.no_grad(): | |
| score = model(**inputs)["score"].item() | |
| print(f"Predicted Sentiment Score: {score:.3f}") | |
| sentiment = "positive" if score > 0.3 else "negative" if score < -0.3 else "neutral" | |
| print(f"Sentiment: {sentiment}\n") | |
| def init_model(): | |
| """Function to properly initialize model with position_ids regardless of whether it's being loaded or created new""" | |
| model = TinyFinBERTRegressor() | |
| # Make sure position_ids is registered | |
| if not hasattr(model.bert.embeddings, 'position_ids'): | |
| model.bert.embeddings.register_buffer( | |
| "position_ids", | |
| torch.arange(512).expand((1, -1)), | |
| persistent=False, | |
| ) | |
| return model | |
| def create_api_model(model_path): | |
| """Create a model suitable for a FastAPI application""" | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| tokenizer = AutoTokenizer.from_pretrained(model_path) | |
| # Initialize model with position_ids properly registered | |
| model = init_model() | |
| model.load_state_dict(torch.load(os.path.join(model_path, "regressor_model.pt"), map_location=device)) | |
| model.to(device) | |
| model.eval() | |
| return model, tokenizer, device | |
| if __name__ == "__main__": | |
| model_dir = "./saved_model" | |
| phrase_path = "./Sentences_50Agree.txt" | |
| words_path = "./financial_sentiment_words_phrases_negations.csv" | |
| # Check for GPU availability | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"Using device: {device}") | |
| if not os.path.isfile(os.path.join(model_dir, "regressor_model.pt")): | |
| print("Training new model...") | |
| train_model(phrase_path, words_path, model_dir) | |
| else: | |
| print(f"Model found at {os.path.join(model_dir, 'regressor_model.pt')}") | |
| evaluate_model(phrase_path, model_dir) | |
| test(model_dir) |