sentiement_app / app.py
pknayak's picture
Added category and specific domain
b1a2ac2
import gradio as gr
from newsdataapi import NewsDataApiClient
import os
import json
import pandas as pd
# ----------------imports for Sentiment Analyzer----------------------
import re
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
nltk.download('rslp')
from nltk.stem import RSLPStemmer
import joblib
# --------------------------------imports for Data Vizualisation
from wordcloud import WordCloud
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.gridspec import GridSpec
import plotly.offline as py
import plotly.express as px
import plotly.graph_objs as go
#--------------------------------------------------------------------------------------
#------------------------ NEWS DATA RETRIEVER------------------------------------------
#--------------------------------------------------------------------------------------
def creating_data_dir(directory_path):
# Use the os.makedirs() function to create the directory
# The 'exist_ok=True' argument allows it to run without errors if the directory already exists
os.makedirs(directory_path, exist_ok=True)
# Check if the directory was created successfully
if os.path.exists(directory_path):
print(f"Directory '{directory_path}' created successfully.")
else:
print(f"Failed to create directory '{directory_path}'.")
def retrieve_news_per_keyword(api, keywords, domain):
selected_domain = domain
selected_domain_url = domain_dict[domain]
for keyword in keywords:
# print(f"{api} \n {keyword}")
# response = api.news_api( q= keyword , country = "us", language = 'en', full_content = True)
response = api.news_api(
# domain=['bbc', 'forbes' , 'businessinsider_us'], # 'bbc', 'forbes' , 'businessinsider_us',
domainurl=selected_domain_url, # 'bbc.com', 'forbes.com', 'businessinsider.com',
category=['business','technology' , 'politics'] ,
# country = "us",
timeframe=48,
language = 'en',
full_content = True,
size=10
)
# writing to a file
file_path = os.path.join(directory_path, f"response_{keyword}.json")
with open(file_path, "w") as outfile:
json.dump(response, outfile)
print(f"News Response for keyword {keyword} is retrieved")
keywords.remove(keyword)
def combine_responses_into_one(directory_path):
# Use a list comprehension to get all file names in the directory
file_list = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]
#retrieve the file_keyword by extracting the string after "_"
# Extract the file_keyword from each filename
file_keywords = [filename.split('_')[1].split('.')[0] for filename in file_list]
# Initialize an empty list to store the combined JSON data
combined_json = []
# Loop through each file name
for filename in file_list:
# Read the current JSON file
with open(directory_path+'/'+filename, 'r') as file:
current_json = json.load(file)
# Extract the file_keyword from the filename
file_keyword = filename.split('_')[1].split('.')[0]
# Add the file_keyword to each result in the current JSON
for result in current_json['results']:
result['file_keyword'] = file_keyword
# Extend the combined JSON list with the results from the current JSON
combined_json.extend(current_json['results'])
print(f'{filename} is added to the combined json object')
# break # using the break to check the loop code always
# Save the combined_json object as a JSON file
with open('combined_news_response.json', 'w') as combined_file:
json.dump(combined_json, combined_file, indent=4)
def convert_json_to_csv(file_name):
json_data_df = pd.read_json(file_name)
json_data_df.head()
# columns = [ 'title', 'keywords', 'creator', 'description', 'content', 'pubDate', 'country', 'category', 'language', 'file_keyword' ]
columns = [ 'title', 'pubDate', 'content', 'country', 'category', 'language' ]
csv_file_name = 'combined_news_response.csv'
json_data_df[columns].to_csv(csv_file_name)
print(f'{csv_file_name} is created')
#-------------------------------------First Function called from the UI----------------------------
# API key authorization, Initialize the client with your API key
NEWSDATA_API_KEY = "pub_2915202f68e543f70bb9aba9611735142c1fd"
keywords = [ "GDP", "CPI", "PPI", "Unemployment Rate", "Interest Rates", "Inflation", "Trade Balance", "Retail Sales", "Manufacturing Index", "Earnings Reports", "Revenue Growth", "Profit Margins", "Earnings Surprises", "Geopolitical Events", "Trade Tensions", "Elections", "Natural Disasters", "Global Health Crises", "Oil Prices", "Gold Prices", "Precious Metals", "Agricultural Commodities", "Federal Reserve", "ECB", "Forex Market", "Exchange Rates", "Currency Pairs", "Tech Company Earnings", "Tech Innovations", "Retail Trends", "Consumer Sentiment", "Financial Regulations", "Government Policies", "Technical Analysis", "Fundamental Analysis", "Cryptocurrency News", "Bitcoin", "Altcoins", "Cryptocurrency Regulations", "S&P 500", "Dow Jones", "NASDAQ", "Market Analysis", "Stock Market Indices" ]
domain_dict = {'bbc': 'bbc.com', 'forbes': 'forbes.com', 'businessinsider_us': 'businessinsider.com'}
# creating a data directory
# Define the directory path you want to create
directory_path = './data'
def call_functions(domain):
creating_data_dir(directory_path)
items = os.listdir(directory_path)
file_name = './combined_news_response.json'
if len(items) == 0:
print(f"Directory '{directory_path}' is empty.")
api = NewsDataApiClient(apikey=NEWSDATA_API_KEY)
retrieve_news_per_keyword(api, keywords, domain)
combine_responses_into_one(directory_path)
convert_json_to_csv(file_name)
elif len(items) >= 2:
print(f"Directory '{directory_path}' contains at least two files.")
combine_responses_into_one(directory_path)
convert_json_to_csv(file_name)
else:
print(f"Directory '{directory_path}' contains only one file.")
# Read the combined CSV file and display the first few rows
csv_file_name = "combined_news_response.csv"
if os.path.exists(csv_file_name):
df = pd.read_csv(csv_file_name)
# Assuming df is your DataFrame
if 'Unnamed: 0' in df.columns:
df.drop('Unnamed: 0', axis=1, inplace=True)
first_few_rows = df.head(10) # Adjust the number of rows as needed
return first_few_rows
else:
return f"CSV file '{csv_file_name}' not found."
#--------------------------------------------------------------------------------------
#------------------------ SENTIMENT ANALYZER------------------------------------------
#--------------------------------------------------------------------------------------
# Get English stopwords
en_stopwords = stopwords.words('english')
#---------------- Data Prepocessing ----------
def re_breakline(text_list):
return [re.sub('[\n\r]', ' ', r) for r in text_list]
def re_hyperlinks(text_list):
# Applying regex
pattern = 'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
return [re.sub(pattern, ' link ', r) for r in text_list]
def re_dates(text_list):
# Applying regex
pattern = '([0-2][0-9]|(3)[0-1])(\/|\.)(((0)[0-9])|((1)[0-2]))(\/|\.)\d{2,4}'
return [re.sub(pattern, ' date ', r) for r in text_list]
def re_money(text_list):
# Applying regex
pattern = '[R]{0,1}\$[ ]{0,}\d+(,|\.)\d+'
return [re.sub(pattern, ' paisa ', r) for r in text_list]
def re_numbers(text_list):
# Applying regex
return [re.sub('[0-9]+', ' num ', r) for r in text_list]
def re_negation(text_list):
# Applying regex
return [re.sub('([nN][ãÃaA][oO]|[ñÑ]| [nN] )', ' negate ', r) for r in text_list]
def re_special_chars(text_list):
# Applying regex
return [re.sub('\W', ' ', r) for r in text_list]
def re_whitespaces(text_list):
# Applying regex
white_spaces = [re.sub('\s+', ' ', r) for r in text_list]
white_spaces_end = [re.sub('[ \t]+$', '', r) for r in white_spaces]
return white_spaces_end
def stopwords_removal(text, cached_stopwords=en_stopwords):
return [c.lower() for c in text.split() if c.lower() not in cached_stopwords]
def stemming_process(text, stemmer=RSLPStemmer()):
return [stemmer.stem(c) for c in text.split()]
class ApplyRegex(BaseEstimator, TransformerMixin):
def __init__(self, regex_transformers):
self.regex_transformers = regex_transformers
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
# Applying all regex functions in the regex_transformers dictionary
for regex_name, regex_function in self.regex_transformers.items():
X = regex_function(X)
return X
# Class for stopwords removal from the corpus
class StopWordsRemoval(BaseEstimator, TransformerMixin):
def __init__(self, text_stopwords):
self.text_stopwords = text_stopwords
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
return [' '.join(stopwords_removal(comment, self.text_stopwords)) for comment in X]
# Class for apply the stemming process
class StemmingProcess(BaseEstimator, TransformerMixin):
def __init__(self, stemmer):
self.stemmer = stemmer
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
return [' '.join(stemming_process(comment, self.stemmer)) for comment in X]
# Class for extracting features from corpus
class TextFeatureExtraction(BaseEstimator, TransformerMixin):
def __init__(self, vectorizer):
self.vectorizer = vectorizer
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
return self.vectorizer.fit_transform(X).toarray()
#----------------------------Creating Pipeline for Preparing the data-----
# Defining regex transformers to be applied
regex_transformers = {
'break_line': re_breakline,
'hiperlinks': re_hyperlinks,
'dates': re_dates,
'money': re_money,
'numbers': re_numbers,
'negation': re_negation,
'special_chars': re_special_chars,
'whitespaces': re_whitespaces
}
# Defining the vectorizer to extract features from text
vectorizer = TfidfVectorizer(max_features=300, min_df=7, max_df=0.8, stop_words=en_stopwords)
# Building the Pipeline
text_pipeline = Pipeline([
('regex', ApplyRegex(regex_transformers)),
('stopwords', StopWordsRemoval(en_stopwords)),
('stemming', StemmingProcess(RSLPStemmer())),
('text_features', TextFeatureExtraction(vectorizer))
])
#----------------- Analyzing the Sentiments of whole dataset-------
def sentiment_analyzer(csv_file_name='combined_news_response.csv'):
df = pd.read_csv(csv_file_name)
df.drop('Unnamed: 0',axis=1,inplace=True)
# Splitting into X and y
X = list(df['content'].values)
# Applying the pipeline
X_processed = text_pipeline.fit_transform(X)
# Load a saved model
loaded_model_nb = joblib.load("Naive Bayes_model.joblib")
# Use the loaded model for inference
loaded_predictions_nb = loaded_model_nb.predict(X_processed)
sentiments = loaded_predictions_nb
# Sentiment mapping
sentiment_mapping = {0: 'negative', 1: 'neutral', 2: 'positive'}
print(f"df['content'].values ==> {len(df['content'].values)} \n sentiments length ==> {len(sentiments)}")
# Create a DataFrame
sentiment_df = pd.DataFrame({
'content': df['content'].values,
'sentiment': [sentiment_mapping[sent] for sent in sentiments]
})
sentiment_df.to_csv('sentiment.csv')
print(f'Sentiment df saved as "sentiment.csv"')
return sentiment_df
#----------------------------------------------------------------------------------------------
#----------------------------------DATA VIZUALIZER---------------------------------------------
#----------------------------------------------------------------------------------------------
def get_senti_pct_distribution(expt_df):
sentiment_counts = expt_df['sentiment'].value_counts()
labels = sentiment_counts.index
sizes = sentiment_counts.values
colors = ['lightblue', 'limegreen', 'lightcoral']
# Create a pie chart
plt.figure(figsize=(8, 8))
plt.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=140)
# Equal aspect ratio ensures that pie is drawn as a circle
plt.axis('equal')
plt.title('Sentiment Distribution for Labelled Data')
# plt.show()
return plt
def preprocessing_data(expt_df):
# Creating a list of comment reviews
news = list(expt_df['content'].values)
# Applying RegEx
news_breakline = re_breakline(news)
expt_df['re_breakline'] = news_breakline
# Applying RegEx
news_hyperlinks = re_hyperlinks(news_breakline)
expt_df['re_hyperlinks'] = news_hyperlinks
# Applying RegEx
news_dates = re_dates(news_hyperlinks)
expt_df['re_dates'] = news_dates
# Applying RegEx
news_money = re_money(news_dates)
expt_df['re_money'] = news_money
# Applying RegEx
news_numbers = re_numbers(news_money)
expt_df['re_numbers'] = news_numbers
# Applying RegEx
news_negation = re_negation(news_numbers)
expt_df['re_negation'] = news_negation
# Applying RegEx
news_special_chars = re_special_chars(news_negation)
expt_df['re_special_chars'] = news_special_chars
# Applying RegEx
news_whitespaces = re_whitespaces(news_special_chars)
expt_df['re_whitespaces'] = news_whitespaces
# Removing stopwords and looking at some examples
news_stopwords = [' '.join(stopwords_removal(news)) for news in news_whitespaces]
expt_df['stopwords_removed'] = news_stopwords
return expt_df
def generate_wc(processed_expt_df):
# Generating words
pos_news = list(processed_expt_df.query('sentiment == "positive"')['stopwords_removed'].values)
positive_words = ' '.join(pos_news).split(' ')
neg_news = list(processed_expt_df.query('sentiment == "negative"')['stopwords_removed'].values)
negative_words = ' '.join(neg_news).split(' ')
neu_news = list(processed_expt_df.query('sentiment == "neutral"')['stopwords_removed'].values)
neutral_words = ' '.join(neu_news).split(' ')
# Using Counter for creating a dictionary counting
positive_dict = Counter(positive_words)
negative_dict = Counter(negative_words)
neutral_dict = Counter(neutral_words)
# Generating wordclouds for news
positive_wc = WordCloud(width=1280,
height=720,
collocations=False,
random_state=42,
# mask=transf_like_mask,
colormap='Blues', background_color='white',
max_words=50).generate_from_frequencies(positive_dict)
negative_wc = WordCloud(width=1280,
height=720,
collocations=False,
random_state=42,
# mask=transf_bomb_mask,
colormap='Reds',
background_color='white',
max_words=50).generate_from_frequencies(negative_dict)
neutral_wc = WordCloud(width=1280,
height=720,
collocations=False,
random_state=42,
# mask=transf_bomb_mask,
colormap='Greens',
background_color='white',
max_words=50).generate_from_frequencies(neutral_dict)
return positive_wc, negative_wc, neutral_wc
# def plot_news_wc(positive_wc, negative_wc, neutral_wc):
# fig, axs = plt.subplots(1, 3, figsize=(20, 20))
# ax1 = axs[0]
# ax2 = axs[1]
# ax3 = axs[2]
# ax1.imshow(positive_wc)
# ax1.axis('off')
# ax1.set_title('WordCloud for Positive Words in News', size=18, pad=20)
# ax2.imshow(negative_wc)
# ax2.axis('off')
# ax2.set_title('WordCloud for Negative Words in News', size=18, pad=20)
# ax3.imshow(neutral_wc)
# ax3.axis('off')
# ax3.set_title('WordCloud for Neutral Words in News', size=18, pad=20)
# return fig
def plot_news_wc(positive_wc, negative_wc, neutral_wc):
fig, axs = plt.subplots(3, 1, figsize=(10, 30)) # 3 rows, 1 column
ax1 = axs[0]
ax2 = axs[1]
ax3 = axs[2]
ax1.imshow(positive_wc)
ax1.axis('off')
ax1.set_title('WordCloud for Positive Words in News', size=18, pad=20)
ax2.imshow(negative_wc)
ax2.axis('off')
ax2.set_title('WordCloud for Negative Words in News', size=18, pad=20)
ax3.imshow(neutral_wc)
ax3.axis('off')
ax3.set_title('WordCloud for Neutral Words in News', size=18, pad=20)
return fig
def get_news_wc(expt_df):
processed_expt_df = preprocessing_data(expt_df)
positive_wc, negative_wc, neutral_wc = generate_wc(processed_expt_df)
return plot_news_wc(positive_wc, negative_wc, neutral_wc)
def call_data_viz_func(plot_type):
senti_csv_file_name = 'sentiment.csv'
expt_df = pd.read_csv(senti_csv_file_name)
if plot_type=='percentage_plot':
return get_senti_pct_distribution(expt_df)
elif plot_type=='word_count_plot':
return get_news_wc(expt_df)
else:
raise ValueError("Unknown plot type selected")
#---------------------- GRADIO APP --------------------
with gr.Blocks() as demo:
gr.Markdown("# Welcome to News Retrieval and Sentiment Analyzer App a.k.a InfoMood Tracker")
gr.Markdown("## Best tracker for your News around the globe ! ")
with gr.Accordion("Steps to run the App"):
gr.Markdown("1. Select the Domain from which you want to retrieve the news")
gr.Markdown("2. Click on the `Retrieve news` to retrieve the news from the domain. You Should see that the result displayed in the form of Table")
gr.Markdown("3. Click on the `Analyze Sentiment` to analyze the sentiments of the news retrieved.")
gr.Markdown("4. Select the radio button `percentage_plot` or `word_count_plot`. Click on the `Vizualize data` to view the respective Vizualization. If needed click the `Clear` Button to clear the plot ")
gr.Markdown("NOTE: Each depends on the file saved the it's previous step, so the sequence is important. For example, you can't get the data viz until and unless you have the Sentiment Analyzed File ")
gr.Markdown("* App Link(Curernt page) : [Hugging Face Spaces link](https://huggingface.co/spaces/pknayak/sentiement_app)")
gr.Markdown("* Documentation : [Notion Link](https://mekongdelta.notion.site/Documentation-for-the-InfoMood-app-e310b4fb371c446daa2405f7efdd5b13)")
# GRADIO ROW FOR NEWS COLLECTOR
with gr.Row():
with gr.Column(scale=1, min_width=600):
ui_domain = gr.Dropdown(["forbes", "bbc", "businessinsider_us"], label="Select Domain")
retrieve_button = gr.Button("Retrieve news")
df_output = gr.Dataframe(type="pandas",wrap=True,label="News retrieved from the selected domain")
retrieve_button.click(call_functions, inputs=ui_domain, outputs=df_output)
# GRADIO ROW FOR ANALYSING SENTIMENT
with gr.Row():
with gr.Column(scale=1, min_width=600):
ui_input = gr.Textbox(value='combined_news_response.csv' , visible=False)
view_sentiment_bttn = gr.Button("Analyze Sentiment")
df_output = gr.Dataframe(type="pandas",wrap=True, label="News along with Sentiment")
view_sentiment_bttn.click(sentiment_analyzer, inputs=ui_input, outputs=df_output)
with gr.Row():
with gr.Column(scale=1, min_width=600):
ui_plot_type = gr.Radio(label="Plot type",
choices=["percentage_plot", "word_count_plot"],
value='percentage_plot')
data_viz_bt = gr.Button("Vizualize data")
plt_output = gr.Plot(label="Data Vizualizer for the News App", show_label=True,)
gr.ClearButton(plt_output)
data_viz_bt.click(call_data_viz_func, inputs=ui_plot_type, outputs=plt_output)
demo.launch(debug=True)