Spaces:
Runtime error
Runtime error
| """FastAPI endpoint | |
| To run locally use 'uvicorn app:app --host localhost --port 7860' | |
| or | |
| `python -m uvicorn app:app --reload --host localhost --port 7860` | |
| """ | |
| import datetime as dt | |
| import json | |
| import logging | |
| import sys | |
| import spacy | |
| #sys.setrecursionlimit(20000) | |
| import pandas as pd | |
| import numpy as np | |
| import os | |
| import random | |
| from typing import Dict, List | |
| import uvicorn | |
| from fastapi import FastAPI, HTTPException, Request, Response | |
| from fastapi.responses import HTMLResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from rouge_score import rouge_scorer | |
| import scripts.sentiment as sentiment | |
| import scripts.twitter_scraper as ts | |
| from scripts import sentiment | |
| from scripts.summarization import bert_summarization | |
| from scripts.twitter_scraper import get_latest_account_tweets | |
| from scripts import twitter_scraper as ts | |
| import scripts.utils as utils | |
| from scripts import generative | |
| import nltk | |
| logging.basicConfig(level=logging.INFO) | |
| app = FastAPI() | |
| templates = Jinja2Templates(directory="templates") | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Construct absolute path to models folder | |
| models_path = os.path.abspath("models") | |
| username_list = [ | |
| "alikarimi_ak8", | |
| "elonmusk", | |
| "BarackObama", | |
| "taylorlorenz", | |
| "cathiedwood", | |
| "ylecun", | |
| ] | |
| ## Static objects/paths | |
| start_date = dt.date(year=2023, month=2, day=1) | |
| end_date = dt.date(year=2023, month=3, day=22) | |
| async def webpage(request: Request): | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def get_accounts() -> List[dict]: | |
| import pandas as pd | |
| logging.info(f"Pulling account information on {username_list}") | |
| account_info_list = [ | |
| ts.get_twitter_account_info(twitter_handle=account) for account in username_list | |
| ] | |
| df_account = pd.DataFrame(account_info_list) | |
| df_account = df_account.style.bar( | |
| subset=["follower_count", "friends_count"], color="#d65f5f" | |
| ) | |
| df_account = df_account.format( | |
| {"follower_count": "{:,.0f}", "friends_count": "{:,.0f}"} | |
| ) | |
| html_table = df_account.to_html(classes="center", index=False) | |
| return HTMLResponse(content=html_table, status_code=200) | |
| def get_tweets_username(username: str) -> dict: | |
| # if username in username_list: | |
| # query = f"from:{username} since:{start_date} until:{end_date}" | |
| # return ts.get_tweets(query=query) | |
| # else: | |
| # return {"detail": "Account not in scope of project."} | |
| # Method 1: Using Tweepy method | |
| # df_tweets = get_latest_account_tweets(username) | |
| # Method 2: Use Snscrape | |
| df_tweets = ts.get_tweets(handle=username) | |
| if isinstance(df_tweets, pd.DataFrame): | |
| print(df_tweets.head(2)) | |
| print(df_tweets.shape) | |
| df_tweets = df_tweets[["handle", "created_at", "full_text"]] | |
| df_tweets["created_at"] = df_tweets["created_at"].dt.strftime("%Y-%m-%d %H:%M:%S") | |
| df_tweets = df_tweets.sort_values("created_at", ascending=False)#.tail(10) | |
| df_tweets_html = df_tweets.to_html(classes="center", index=False, escape=False) | |
| df_tweets.to_html(open('df_tweets_html.html', 'w')) | |
| df_tweets_data = df_tweets.to_dict(orient="records") | |
| response_data = { | |
| "html": df_tweets_html, | |
| "data": df_tweets_data | |
| } | |
| return JSONResponse(content=response_data, status_code=200) | |
| # return HTMLResponse(content=df_tweets_html, status_code=200) | |
| else: | |
| print("Error: Failed to retrieve tweets.") | |
| return df_tweets | |
| async def get_audience(username: str) -> dict: | |
| if username in username_list: | |
| query = f"from:{username} since:{start_date} until:{end_date}" | |
| tweets = ts.get_tweets(query=query) | |
| n_samples = 5 | |
| # Random sample 3 tweets from user | |
| tweets_sampled = random.sample(tweets, n_samples) | |
| # Get all replies to sampled tweets | |
| tweet_threads = [] | |
| for tweet in tweets_sampled: | |
| threads = ts.get_replies( | |
| username=tweet["username"], | |
| conversation_id=tweet["conversation_id"], | |
| max_tweets=100, | |
| ) | |
| tweet_threads += threads | |
| # Get usernames from sample threads tweets | |
| usernames = [t["username"] for t in tweet_threads] | |
| # Get user info from sample replies to sampled tweets of user | |
| info_accounts = [ | |
| ts.get_twitter_account_info(twitter_handle=account) for account in usernames | |
| ] | |
| # "follower_count":1,"friends_count":20,"verified":false} | |
| # Get stats for followers/audience engaging with tweets | |
| follower_counts = [ | |
| info_accounts[i]["follower_count"] for i in range(len(info_accounts)) | |
| ] | |
| friends_counts = [ | |
| info_accounts[i]["friends_count"] for i in range(len(info_accounts)) | |
| ] | |
| verified_counts = [ | |
| 1 if info_accounts[i]["verified"] == True else 0 | |
| for i in range(len(info_accounts)) | |
| ] | |
| return { | |
| "sample_size": len(info_accounts), | |
| "mean_follower_count": round(np.mean(follower_counts), 3), | |
| "mean_friends_count": round(np.mean(friends_counts), 3), | |
| "mean_verified": round(np.mean(verified_counts), 3), | |
| } | |
| else: | |
| response = Response(content="Account not in scope of project.", status_code=404) | |
| return response | |
| async def get_sentiment(username: str) -> Dict[str, Dict[str, float]]: | |
| if username not in username_list: | |
| raise HTTPException(status_code=404, detail="Account not in scope of project.") | |
| query = f"from:{username} since:{start_date} until:{end_date}" | |
| tweets = ts.get_tweets(query=query) | |
| n_samples = 5 | |
| tweets_sampled = random.sample(tweets, n_samples) | |
| tweet_threads = [] | |
| for tweet in tweets_sampled: | |
| threads = ts.get_replies( | |
| username=tweet["username"], | |
| conversation_id=tweet["conversation_id"], | |
| max_tweets=100, | |
| ) | |
| tweet_threads += threads | |
| print( | |
| f"Total replies to {n_samples} sampled tweets from username: {username}, {len(tweet_threads)}" | |
| ) | |
| ## Sentiment scoring | |
| print(f"Running tweet sentiment scoring on username: {username} tweets") | |
| tweets_scores = sentiment.get_tweets_sentiment(tweets=tweets) | |
| mean_tweets_score = round(np.mean(tweets_scores), 2) | |
| ci_tweets = utils.wilson_score_interval(tweets_scores) | |
| # Get sentiment of the threads from tweets | |
| # Get username tweets sentiment | |
| print(f"Running tweet thread sentiment scoring on username: {username} tweets") | |
| threads_scores = sentiment.get_tweets_sentiment(tweets=tweet_threads) | |
| mean_threads_score = round(np.mean(threads_scores), 2) | |
| ci_threads = utils.wilson_score_interval(threads_scores) | |
| return { | |
| "thread_level": { | |
| "mean": mean_threads_score, | |
| "confidence_interal": ci_threads, | |
| }, | |
| "audience_level": { | |
| "mean": mean_tweets_score, | |
| "confidence_interval": ci_tweets, | |
| }, | |
| } | |
| async def generate_text(request: Request): | |
| """Generate text from a prompt. | |
| Args: | |
| request: The HTTP request. | |
| Returns: | |
| The generated text. | |
| """ | |
| print("*" * 50) | |
| data = await request.json() | |
| print("*" * 50) | |
| print("POST Request:") | |
| # Check length of input, if it is greater than 10 tokens, the text is sent off to a summarizer to generate: | |
| try: | |
| generated_text = generative.generate_account_text( | |
| prompt=data["text"], model_dir=os.path.join(models_path, data["account"]) | |
| ) | |
| logging.info("INFO: Successfully generate text from model.") | |
| except Exception as e: | |
| logging.error(f"Error generating text: {e}") | |
| return {"error": "Error generating text"} | |
| # return one example | |
| generated_text = generated_text[0]["generated_text"] | |
| ################################################### | |
| ## Clean up generate text | |
| # Get rid of final sentence | |
| sentences = nltk.sent_tokenize(generated_text) | |
| unique_sentences = set() | |
| non_duplicate_sentences = [] | |
| for sentence in sentences: | |
| if sentence not in unique_sentences: | |
| non_duplicate_sentences.append(sentence) | |
| unique_sentences.add(sentence) | |
| final_text = " ".join(non_duplicate_sentences[:-1]) | |
| return {"generated_text": final_text} | |
| async def generate_summary(request: Request): | |
| """Generate summary from tweets | |
| Args: | |
| request: The HTTP request. | |
| Returns: | |
| The generated text. | |
| """ | |
| print("*" * 50) | |
| data = await request.json() | |
| print('data',data['tweetsData']) | |
| # Get the list of text | |
| tweets = [t['full_text'] for t in data["tweetsData"]] | |
| # Concatenate tweets into a single string | |
| text = " .".join(tweets) | |
| nlp = spacy.load("en_core_web_sm") | |
| nlp.add_pipe("sentencizer") | |
| sentences = nlp(text).sents | |
| # sentences = Text8Corpus(text) | |
| # phrases = Phrases( | |
| # sentences, min_count=1, threshold=1, connector_words=ENGLISH_CONNECTOR_WORDS | |
| # ) | |
| # first_sentence = next(iter(sentences)) | |
| # first_sentence | |
| sentences = list(sentences) | |
| # # Shuffle the list | |
| # random.shuffle(sentences) | |
| # Option 1 | |
| # sampled_tweets = random.sample(tweets, int(0.1 * len(tweets))) | |
| # Option 2 | |
| sampled_sentences = random.sample(sentences, int(0.1 * len(sentences))) | |
| sampled_sentences = [sentiment.tweet_cleaner(s.text) for s in sampled_sentences] | |
| # Join the strings into one text blob | |
| tweet_blob = " ".join(sampled_sentences) | |
| # Generate the summary | |
| summary = bert_summarization( | |
| tweet_blob | |
| ) | |
| print("Summary:",summary) | |
| # Return the summary | |
| return {"tweets_summary": summary} | |
| async def read_examples(): | |
| with open("templates/charts/handle_sentiment_breakdown.html") as f: | |
| html = f.read() | |
| return HTMLResponse(content=html) | |
| async def read_examples(): | |
| with open("templates/charts/handle_sentiment_timesteps.html") as f: | |
| html = f.read() | |
| return HTMLResponse(content=html) | |