Spaces:
Runtime error
Runtime error
"""FastAPI endpoint | |
To run locally use 'uvicorn app:app --host localhost --port 7860' | |
or | |
`python -m uvicorn app:app --reload --host localhost --port 7860` | |
""" | |
import datetime as dt | |
import json | |
import logging | |
import sys | |
import spacy | |
# sys.setrecursionlimit(20000) | |
import pandas as pd | |
import numpy as np | |
import os | |
import random | |
from typing import Dict, List | |
import uvicorn | |
from fastapi import FastAPI, HTTPException, Request, Response | |
from fastapi.responses import HTMLResponse, JSONResponse | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
from rouge_score import rouge_scorer | |
# Scripts | |
import scripts.sentiment as sentiment | |
import scripts.twitter_scraper as ts | |
from scripts import sentiment | |
from scripts.summarization import bert_summarization | |
from scripts.twitter_scraper import get_latest_account_tweets | |
from scripts.sentiment import twitter_sentiment_api_score | |
from scripts import twitter_scraper as ts | |
import scripts.utils as utils | |
from scripts import translation | |
from scripts import generative | |
import nltk | |
nltk.download('punkt') | |
punkt_download_location = nltk.data.path[0] | |
logging.info(f"punkt_download_location: {punkt_download_location}") | |
logging.basicConfig(level=logging.INFO) | |
pd.set_option('display.max_colwidth', 20) | |
app = FastAPI() | |
templates = Jinja2Templates(directory="templates") | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
# Construct absolute path to models folder | |
models_path = os.path.abspath("models") | |
username_list = [ | |
"alikarimi_ak8", | |
"elonmusk", | |
"BarackObama", | |
"taylorlorenz", | |
"cathiedwood", | |
"ylecun", | |
] | |
## Static objects/paths | |
start_date = dt.date(year=2023, month=2, day=1) | |
end_date = dt.date(year=2023, month=3, day=22) | |
# Load spacy module on app start | |
nlp = spacy.load("en_core_web_sm") | |
nlp.add_pipe("sentencizer") | |
async def webpage(request: Request): | |
return templates.TemplateResponse("index.html", {"request": request}) | |
async def get_accounts() -> List[dict]: | |
import pandas as pd | |
logging.info(f"Pulling account information on {username_list}") | |
account_info_list = [ | |
ts.get_twitter_account_info(twitter_handle=account) for account in username_list | |
] | |
df_account = pd.DataFrame(account_info_list) | |
df_account = df_account.style.bar( | |
subset=["follower_count", "friends_count"], color="#d65f5f" | |
) | |
df_account = df_account.format( | |
{"follower_count": "{:,.0f}", "friends_count": "{:,.0f}"} | |
) | |
html_table = df_account.to_html(classes="center", index=False) | |
return HTMLResponse(content=html_table, status_code=200) | |
def get_tweets_username(username: str) -> dict: | |
# Method 2: Use Snscrape | |
df_tweets = ts.get_tweets(handle=username) | |
if isinstance(df_tweets, pd.DataFrame): | |
df_tweets = df_tweets[["handle", "created_at","retweet_count","view_count","like_count", "full_text"]] | |
df_tweets["created_at"] = df_tweets["created_at"].dt.strftime( | |
"%Y-%m-%d %H:%M:%S" | |
) | |
df_tweets = df_tweets.sort_values("created_at", ascending=False) | |
# Additional processing | |
logging.info("Running sentiment on tweets") | |
sentiments = twitter_sentiment_api_score( | |
df_tweets['full_text'].to_list(), use_api=False | |
) | |
df_tweets["sentiment"] = [s['argmax'] for s in sentiments] | |
# if username == "alikarimi_ak8": | |
# p = translation.PersianTextProcessor() | |
# df_tweets['full_text_translated'] = df_tweets["full_text"].apply(lambda c: p.translate_text(persian_text = c)) | |
df_tweets_html = df_tweets.to_html(classes="center", index=False, escape=False) | |
df_tweets.to_html(open("df_tweets_html.html", "w")) | |
df_tweets_data = df_tweets.to_dict(orient="records") | |
response_data = {"html": df_tweets_html, "data": df_tweets_data} | |
return JSONResponse(content=response_data, status_code=200) | |
else: | |
print("Error: Failed to retrieve tweets.") | |
return df_tweets | |
async def get_audience(username: str) -> dict: | |
if username in username_list: | |
query = f"from:{username} since:{start_date} until:{end_date}" | |
tweets = ts.get_tweets(query=query) | |
n_samples = 5 | |
# Random sample 3 tweets from user | |
tweets_sampled = random.sample(tweets, n_samples) | |
# Get all replies to sampled tweets | |
tweet_threads = [] | |
for tweet in tweets_sampled: | |
threads = ts.get_replies( | |
username=tweet["username"], | |
conversation_id=tweet["conversation_id"], | |
max_tweets=100, | |
) | |
tweet_threads += threads | |
# Get usernames from sample threads tweets | |
usernames = [t["username"] for t in tweet_threads] | |
# Get user info from sample replies to sampled tweets of user | |
info_accounts = [ | |
ts.get_twitter_account_info(twitter_handle=account) for account in usernames | |
] | |
# "follower_count":1,"friends_count":20,"verified":false} | |
# Get stats for followers/audience engaging with tweets | |
follower_counts = [ | |
info_accounts[i]["follower_count"] for i in range(len(info_accounts)) | |
] | |
friends_counts = [ | |
info_accounts[i]["friends_count"] for i in range(len(info_accounts)) | |
] | |
verified_counts = [ | |
1 if info_accounts[i]["verified"] == True else 0 | |
for i in range(len(info_accounts)) | |
] | |
return { | |
"sample_size": len(info_accounts), | |
"mean_follower_count": round(np.mean(follower_counts), 3), | |
"mean_friends_count": round(np.mean(friends_counts), 3), | |
"mean_verified": round(np.mean(verified_counts), 3), | |
} | |
else: | |
response = Response(content="Account not in scope of project.", status_code=404) | |
return response | |
async def get_sentiment(username: str) -> Dict[str, Dict[str, float]]: | |
if username not in username_list: | |
raise HTTPException(status_code=404, detail="Account not in scope of project.") | |
query = f"from:{username} since:{start_date} until:{end_date}" | |
tweets = ts.get_tweets(query=query) | |
n_samples = 5 | |
tweets_sampled = random.sample(tweets, n_samples) | |
tweet_threads = [] | |
for tweet in tweets_sampled: | |
threads = ts.get_replies( | |
username=tweet["username"], | |
conversation_id=tweet["conversation_id"], | |
max_tweets=100, | |
) | |
tweet_threads += threads | |
print( | |
f"Total replies to {n_samples} sampled tweets from username: {username}, {len(tweet_threads)}" | |
) | |
## Sentiment scoring | |
print(f"Running tweet sentiment scoring on username: {username} tweets") | |
tweets_scores = sentiment.get_tweets_sentiment(tweets=tweets) | |
mean_tweets_score = round(np.mean(tweets_scores), 2) | |
ci_tweets = utils.wilson_score_interval(tweets_scores) | |
# Get sentiment of the threads from tweets | |
# Get username tweets sentiment | |
print(f"Running tweet thread sentiment scoring on username: {username} tweets") | |
threads_scores = sentiment.get_tweets_sentiment(tweets=tweet_threads) | |
mean_threads_score = round(np.mean(threads_scores), 2) | |
ci_threads = utils.wilson_score_interval(threads_scores) | |
return { | |
"thread_level": { | |
"mean": mean_threads_score, | |
"confidence_interal": ci_threads, | |
}, | |
"audience_level": { | |
"mean": mean_tweets_score, | |
"confidence_interval": ci_tweets, | |
}, | |
} | |
## APIs: Primarily called by the index page | |
async def generate_text(request: Request): | |
"""Generate text from a prompt. | |
Args: | |
request: The HTTP request. | |
Returns: | |
The generated text. | |
""" | |
print("*" * 50) | |
data = await request.json() | |
print("*" * 50) | |
logging.info("POST to api/generate received and processing") | |
# Check length of input, if it is greater than 10 tokens, the text is sent off to a summarizer to generate: | |
try: | |
generated_text = generative.generate_account_text( | |
prompt=data["text"], model_dir=os.path.join(models_path, data["account"]) | |
) | |
logging.info("INFO: Successfully generate text from model.") | |
except Exception as e: | |
logging.error(f"Error generating text: {e}") | |
return {"error": "Error generating text"} | |
# return one example | |
generated_text = generated_text[0]["generated_text"] | |
################################################### | |
## Clean up generate text | |
# Get rid of final sentence | |
sentences = nltk.sent_tokenize(generated_text) | |
unique_sentences = set() | |
non_duplicate_sentences = [] | |
for sentence in sentences: | |
if sentence not in unique_sentences: | |
non_duplicate_sentences.append(sentence) | |
unique_sentences.add(sentence) | |
final_text = " ".join(non_duplicate_sentences[:-1]) | |
return {"generated_text": final_text} | |
async def generate_summary(request: Request): | |
"""Generate summary from tweets | |
Args: | |
request: The HTTP request. | |
Returns: | |
The generated text. | |
""" | |
print("*" * 50) | |
data = await request.json() | |
print("data", data["tweetsData"]) | |
# Get the list of text | |
tweets = [t["full_text"] for t in data["tweetsData"]] | |
# Concatenate tweets into a single string | |
text = " .".join(tweets) | |
sentences = nlp(text).sents | |
sentences = list(sentences) | |
# Option 2 | |
sampled_sentences = random.sample(sentences, int(0.1 * len(sentences))) | |
sampled_sentences = [sentiment.tweet_cleaner(s.text) for s in sampled_sentences] | |
# Join the strings into one text blob | |
tweet_blob = " ".join(sampled_sentences) | |
# Generate the summary | |
summary = bert_summarization(tweet_blob) | |
print("Summary:", summary) | |
# Return the summary | |
return {"tweets_summary": summary} | |
## Historical Tweets pages | |
async def read_examples(): | |
with open("templates/charts/handle_sentiment_breakdown.html") as f: | |
html = f.read() | |
return HTMLResponse(content=html) | |
async def read_examples(): | |
with open("templates/charts/handle_sentiment_timesteps.html") as f: | |
html = f.read() | |
return HTMLResponse(content=html) | |
# uvicorn --workers=2 app:app | |
if __name__ == "__main__": | |
# uvicorn.run(app, host="0.0.0.0", port=8000) | |
uvicorn.run("app:app", host="127.0.0.1", port=5050, reload=True) | |