Spaces:
Runtime error
Runtime error
import gradio as gr | |
from newsdataapi import NewsDataApiClient | |
import os | |
import json | |
import pandas as pd | |
# ----------------imports for Sentiment Analyzer---------------------- | |
import re | |
from sklearn.pipeline import Pipeline | |
from sklearn.base import BaseEstimator, TransformerMixin | |
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer | |
import nltk | |
nltk.download('stopwords') | |
from nltk.corpus import stopwords | |
nltk.download('rslp') | |
from nltk.stem import RSLPStemmer | |
import joblib | |
# --------------------------------imports for Data Vizualisation | |
from wordcloud import WordCloud | |
from collections import Counter | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from matplotlib.gridspec import GridSpec | |
import plotly.offline as py | |
import plotly.express as px | |
import plotly.graph_objs as go | |
#-------------------------------------------------------------------------------------- | |
#------------------------ NEWS DATA RETRIEVER------------------------------------------ | |
#-------------------------------------------------------------------------------------- | |
def creating_data_dir(directory_path): | |
# Use the os.makedirs() function to create the directory | |
# The 'exist_ok=True' argument allows it to run without errors if the directory already exists | |
os.makedirs(directory_path, exist_ok=True) | |
# Check if the directory was created successfully | |
if os.path.exists(directory_path): | |
print(f"Directory '{directory_path}' created successfully.") | |
else: | |
print(f"Failed to create directory '{directory_path}'.") | |
def retrieve_news_per_keyword(api, keywords, domain): | |
selected_domain = domain | |
selected_domain_url = domain_dict[domain] | |
for keyword in keywords: | |
# print(f"{api} \n {keyword}") | |
# response = api.news_api( q= keyword , country = "us", language = 'en', full_content = True) | |
response = api.news_api( | |
# domain=['bbc', 'forbes' , 'businessinsider_us'], # 'bbc', 'forbes' , 'businessinsider_us', | |
domainurl=selected_domain_url, # 'bbc.com', 'forbes.com', 'businessinsider.com', | |
category=['business','technology' , 'politics'] , | |
# country = "us", | |
timeframe=48, | |
language = 'en', | |
full_content = True, | |
size=10 | |
) | |
# writing to a file | |
file_path = os.path.join(directory_path, f"response_{keyword}.json") | |
with open(file_path, "w") as outfile: | |
json.dump(response, outfile) | |
print(f"News Response for keyword {keyword} is retrieved") | |
keywords.remove(keyword) | |
def combine_responses_into_one(directory_path): | |
# Use a list comprehension to get all file names in the directory | |
file_list = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))] | |
#retrieve the file_keyword by extracting the string after "_" | |
# Extract the file_keyword from each filename | |
file_keywords = [filename.split('_')[1].split('.')[0] for filename in file_list] | |
# Initialize an empty list to store the combined JSON data | |
combined_json = [] | |
# Loop through each file name | |
for filename in file_list: | |
# Read the current JSON file | |
with open(directory_path+'/'+filename, 'r') as file: | |
current_json = json.load(file) | |
# Extract the file_keyword from the filename | |
file_keyword = filename.split('_')[1].split('.')[0] | |
# Add the file_keyword to each result in the current JSON | |
for result in current_json['results']: | |
result['file_keyword'] = file_keyword | |
# Extend the combined JSON list with the results from the current JSON | |
combined_json.extend(current_json['results']) | |
print(f'{filename} is added to the combined json object') | |
# break # using the break to check the loop code always | |
# Save the combined_json object as a JSON file | |
with open('combined_news_response.json', 'w') as combined_file: | |
json.dump(combined_json, combined_file, indent=4) | |
def convert_json_to_csv(file_name): | |
json_data_df = pd.read_json(file_name) | |
json_data_df.head() | |
# columns = [ 'title', 'keywords', 'creator', 'description', 'content', 'pubDate', 'country', 'category', 'language', 'file_keyword' ] | |
columns = [ 'title', 'pubDate', 'content', 'country', 'category', 'language' ] | |
csv_file_name = 'combined_news_response.csv' | |
json_data_df[columns].to_csv(csv_file_name) | |
print(f'{csv_file_name} is created') | |
#-------------------------------------First Function called from the UI---------------------------- | |
# API key authorization, Initialize the client with your API key | |
NEWSDATA_API_KEY = "pub_2915202f68e543f70bb9aba9611735142c1fd" | |
keywords = [ "GDP", "CPI", "PPI", "Unemployment Rate", "Interest Rates", "Inflation", "Trade Balance", "Retail Sales", "Manufacturing Index", "Earnings Reports", "Revenue Growth", "Profit Margins", "Earnings Surprises", "Geopolitical Events", "Trade Tensions", "Elections", "Natural Disasters", "Global Health Crises", "Oil Prices", "Gold Prices", "Precious Metals", "Agricultural Commodities", "Federal Reserve", "ECB", "Forex Market", "Exchange Rates", "Currency Pairs", "Tech Company Earnings", "Tech Innovations", "Retail Trends", "Consumer Sentiment", "Financial Regulations", "Government Policies", "Technical Analysis", "Fundamental Analysis", "Cryptocurrency News", "Bitcoin", "Altcoins", "Cryptocurrency Regulations", "S&P 500", "Dow Jones", "NASDAQ", "Market Analysis", "Stock Market Indices" ] | |
domain_dict = {'bbc': 'bbc.com', 'forbes': 'forbes.com', 'businessinsider_us': 'businessinsider.com'} | |
# creating a data directory | |
# Define the directory path you want to create | |
directory_path = './data' | |
def call_functions(domain): | |
creating_data_dir(directory_path) | |
items = os.listdir(directory_path) | |
file_name = './combined_news_response.json' | |
if len(items) == 0: | |
print(f"Directory '{directory_path}' is empty.") | |
api = NewsDataApiClient(apikey=NEWSDATA_API_KEY) | |
retrieve_news_per_keyword(api, keywords, domain) | |
combine_responses_into_one(directory_path) | |
convert_json_to_csv(file_name) | |
elif len(items) >= 2: | |
print(f"Directory '{directory_path}' contains at least two files.") | |
combine_responses_into_one(directory_path) | |
convert_json_to_csv(file_name) | |
else: | |
print(f"Directory '{directory_path}' contains only one file.") | |
# Read the combined CSV file and display the first few rows | |
csv_file_name = "combined_news_response.csv" | |
if os.path.exists(csv_file_name): | |
df = pd.read_csv(csv_file_name) | |
# Assuming df is your DataFrame | |
if 'Unnamed: 0' in df.columns: | |
df.drop('Unnamed: 0', axis=1, inplace=True) | |
first_few_rows = df.head(10) # Adjust the number of rows as needed | |
return first_few_rows | |
else: | |
return f"CSV file '{csv_file_name}' not found." | |
#-------------------------------------------------------------------------------------- | |
#------------------------ SENTIMENT ANALYZER------------------------------------------ | |
#-------------------------------------------------------------------------------------- | |
# Get English stopwords | |
en_stopwords = stopwords.words('english') | |
#---------------- Data Prepocessing ---------- | |
def re_breakline(text_list): | |
return [re.sub('[\n\r]', ' ', r) for r in text_list] | |
def re_hyperlinks(text_list): | |
# Applying regex | |
pattern = 'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' | |
return [re.sub(pattern, ' link ', r) for r in text_list] | |
def re_dates(text_list): | |
# Applying regex | |
pattern = '([0-2][0-9]|(3)[0-1])(\/|\.)(((0)[0-9])|((1)[0-2]))(\/|\.)\d{2,4}' | |
return [re.sub(pattern, ' date ', r) for r in text_list] | |
def re_money(text_list): | |
# Applying regex | |
pattern = '[R]{0,1}\$[ ]{0,}\d+(,|\.)\d+' | |
return [re.sub(pattern, ' paisa ', r) for r in text_list] | |
def re_numbers(text_list): | |
# Applying regex | |
return [re.sub('[0-9]+', ' num ', r) for r in text_list] | |
def re_negation(text_list): | |
# Applying regex | |
return [re.sub('([nN][ãÃaA][oO]|[ñÑ]| [nN] )', ' negate ', r) for r in text_list] | |
def re_special_chars(text_list): | |
# Applying regex | |
return [re.sub('\W', ' ', r) for r in text_list] | |
def re_whitespaces(text_list): | |
# Applying regex | |
white_spaces = [re.sub('\s+', ' ', r) for r in text_list] | |
white_spaces_end = [re.sub('[ \t]+$', '', r) for r in white_spaces] | |
return white_spaces_end | |
def stopwords_removal(text, cached_stopwords=en_stopwords): | |
return [c.lower() for c in text.split() if c.lower() not in cached_stopwords] | |
def stemming_process(text, stemmer=RSLPStemmer()): | |
return [stemmer.stem(c) for c in text.split()] | |
class ApplyRegex(BaseEstimator, TransformerMixin): | |
def __init__(self, regex_transformers): | |
self.regex_transformers = regex_transformers | |
def fit(self, X, y=None): | |
return self | |
def transform(self, X, y=None): | |
# Applying all regex functions in the regex_transformers dictionary | |
for regex_name, regex_function in self.regex_transformers.items(): | |
X = regex_function(X) | |
return X | |
# Class for stopwords removal from the corpus | |
class StopWordsRemoval(BaseEstimator, TransformerMixin): | |
def __init__(self, text_stopwords): | |
self.text_stopwords = text_stopwords | |
def fit(self, X, y=None): | |
return self | |
def transform(self, X, y=None): | |
return [' '.join(stopwords_removal(comment, self.text_stopwords)) for comment in X] | |
# Class for apply the stemming process | |
class StemmingProcess(BaseEstimator, TransformerMixin): | |
def __init__(self, stemmer): | |
self.stemmer = stemmer | |
def fit(self, X, y=None): | |
return self | |
def transform(self, X, y=None): | |
return [' '.join(stemming_process(comment, self.stemmer)) for comment in X] | |
# Class for extracting features from corpus | |
class TextFeatureExtraction(BaseEstimator, TransformerMixin): | |
def __init__(self, vectorizer): | |
self.vectorizer = vectorizer | |
def fit(self, X, y=None): | |
return self | |
def transform(self, X, y=None): | |
return self.vectorizer.fit_transform(X).toarray() | |
#----------------------------Creating Pipeline for Preparing the data----- | |
# Defining regex transformers to be applied | |
regex_transformers = { | |
'break_line': re_breakline, | |
'hiperlinks': re_hyperlinks, | |
'dates': re_dates, | |
'money': re_money, | |
'numbers': re_numbers, | |
'negation': re_negation, | |
'special_chars': re_special_chars, | |
'whitespaces': re_whitespaces | |
} | |
# Defining the vectorizer to extract features from text | |
vectorizer = TfidfVectorizer(max_features=300, min_df=7, max_df=0.8, stop_words=en_stopwords) | |
# Building the Pipeline | |
text_pipeline = Pipeline([ | |
('regex', ApplyRegex(regex_transformers)), | |
('stopwords', StopWordsRemoval(en_stopwords)), | |
('stemming', StemmingProcess(RSLPStemmer())), | |
('text_features', TextFeatureExtraction(vectorizer)) | |
]) | |
#----------------- Analyzing the Sentiments of whole dataset------- | |
def sentiment_analyzer(csv_file_name='combined_news_response.csv'): | |
df = pd.read_csv(csv_file_name) | |
df.drop('Unnamed: 0',axis=1,inplace=True) | |
# Splitting into X and y | |
X = list(df['content'].values) | |
# Applying the pipeline | |
X_processed = text_pipeline.fit_transform(X) | |
# Load a saved model | |
loaded_model_nb = joblib.load("Naive Bayes_model.joblib") | |
# Use the loaded model for inference | |
loaded_predictions_nb = loaded_model_nb.predict(X_processed) | |
sentiments = loaded_predictions_nb | |
# Sentiment mapping | |
sentiment_mapping = {0: 'negative', 1: 'neutral', 2: 'positive'} | |
print(f"df['content'].values ==> {len(df['content'].values)} \n sentiments length ==> {len(sentiments)}") | |
# Create a DataFrame | |
sentiment_df = pd.DataFrame({ | |
'content': df['content'].values, | |
'sentiment': [sentiment_mapping[sent] for sent in sentiments] | |
}) | |
sentiment_df.to_csv('sentiment.csv') | |
print(f'Sentiment df saved as "sentiment.csv"') | |
return sentiment_df | |
#---------------------------------------------------------------------------------------------- | |
#----------------------------------DATA VIZUALIZER--------------------------------------------- | |
#---------------------------------------------------------------------------------------------- | |
def get_senti_pct_distribution(expt_df): | |
sentiment_counts = expt_df['sentiment'].value_counts() | |
labels = sentiment_counts.index | |
sizes = sentiment_counts.values | |
colors = ['lightblue', 'limegreen', 'lightcoral'] | |
# Create a pie chart | |
plt.figure(figsize=(8, 8)) | |
plt.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=140) | |
# Equal aspect ratio ensures that pie is drawn as a circle | |
plt.axis('equal') | |
plt.title('Sentiment Distribution for Labelled Data') | |
# plt.show() | |
return plt | |
def preprocessing_data(expt_df): | |
# Creating a list of comment reviews | |
news = list(expt_df['content'].values) | |
# Applying RegEx | |
news_breakline = re_breakline(news) | |
expt_df['re_breakline'] = news_breakline | |
# Applying RegEx | |
news_hyperlinks = re_hyperlinks(news_breakline) | |
expt_df['re_hyperlinks'] = news_hyperlinks | |
# Applying RegEx | |
news_dates = re_dates(news_hyperlinks) | |
expt_df['re_dates'] = news_dates | |
# Applying RegEx | |
news_money = re_money(news_dates) | |
expt_df['re_money'] = news_money | |
# Applying RegEx | |
news_numbers = re_numbers(news_money) | |
expt_df['re_numbers'] = news_numbers | |
# Applying RegEx | |
news_negation = re_negation(news_numbers) | |
expt_df['re_negation'] = news_negation | |
# Applying RegEx | |
news_special_chars = re_special_chars(news_negation) | |
expt_df['re_special_chars'] = news_special_chars | |
# Applying RegEx | |
news_whitespaces = re_whitespaces(news_special_chars) | |
expt_df['re_whitespaces'] = news_whitespaces | |
# Removing stopwords and looking at some examples | |
news_stopwords = [' '.join(stopwords_removal(news)) for news in news_whitespaces] | |
expt_df['stopwords_removed'] = news_stopwords | |
return expt_df | |
def generate_wc(processed_expt_df): | |
# Generating words | |
pos_news = list(processed_expt_df.query('sentiment == "positive"')['stopwords_removed'].values) | |
positive_words = ' '.join(pos_news).split(' ') | |
neg_news = list(processed_expt_df.query('sentiment == "negative"')['stopwords_removed'].values) | |
negative_words = ' '.join(neg_news).split(' ') | |
neu_news = list(processed_expt_df.query('sentiment == "neutral"')['stopwords_removed'].values) | |
neutral_words = ' '.join(neu_news).split(' ') | |
# Using Counter for creating a dictionary counting | |
positive_dict = Counter(positive_words) | |
negative_dict = Counter(negative_words) | |
neutral_dict = Counter(neutral_words) | |
# Generating wordclouds for news | |
positive_wc = WordCloud(width=1280, | |
height=720, | |
collocations=False, | |
random_state=42, | |
# mask=transf_like_mask, | |
colormap='Blues', background_color='white', | |
max_words=50).generate_from_frequencies(positive_dict) | |
negative_wc = WordCloud(width=1280, | |
height=720, | |
collocations=False, | |
random_state=42, | |
# mask=transf_bomb_mask, | |
colormap='Reds', | |
background_color='white', | |
max_words=50).generate_from_frequencies(negative_dict) | |
neutral_wc = WordCloud(width=1280, | |
height=720, | |
collocations=False, | |
random_state=42, | |
# mask=transf_bomb_mask, | |
colormap='Greens', | |
background_color='white', | |
max_words=50).generate_from_frequencies(neutral_dict) | |
return positive_wc, negative_wc, neutral_wc | |
# def plot_news_wc(positive_wc, negative_wc, neutral_wc): | |
# fig, axs = plt.subplots(1, 3, figsize=(20, 20)) | |
# ax1 = axs[0] | |
# ax2 = axs[1] | |
# ax3 = axs[2] | |
# ax1.imshow(positive_wc) | |
# ax1.axis('off') | |
# ax1.set_title('WordCloud for Positive Words in News', size=18, pad=20) | |
# ax2.imshow(negative_wc) | |
# ax2.axis('off') | |
# ax2.set_title('WordCloud for Negative Words in News', size=18, pad=20) | |
# ax3.imshow(neutral_wc) | |
# ax3.axis('off') | |
# ax3.set_title('WordCloud for Neutral Words in News', size=18, pad=20) | |
# return fig | |
def plot_news_wc(positive_wc, negative_wc, neutral_wc): | |
fig, axs = plt.subplots(3, 1, figsize=(10, 30)) # 3 rows, 1 column | |
ax1 = axs[0] | |
ax2 = axs[1] | |
ax3 = axs[2] | |
ax1.imshow(positive_wc) | |
ax1.axis('off') | |
ax1.set_title('WordCloud for Positive Words in News', size=18, pad=20) | |
ax2.imshow(negative_wc) | |
ax2.axis('off') | |
ax2.set_title('WordCloud for Negative Words in News', size=18, pad=20) | |
ax3.imshow(neutral_wc) | |
ax3.axis('off') | |
ax3.set_title('WordCloud for Neutral Words in News', size=18, pad=20) | |
return fig | |
def get_news_wc(expt_df): | |
processed_expt_df = preprocessing_data(expt_df) | |
positive_wc, negative_wc, neutral_wc = generate_wc(processed_expt_df) | |
return plot_news_wc(positive_wc, negative_wc, neutral_wc) | |
def call_data_viz_func(plot_type): | |
senti_csv_file_name = 'sentiment.csv' | |
expt_df = pd.read_csv(senti_csv_file_name) | |
if plot_type=='percentage_plot': | |
return get_senti_pct_distribution(expt_df) | |
elif plot_type=='word_count_plot': | |
return get_news_wc(expt_df) | |
else: | |
raise ValueError("Unknown plot type selected") | |
#---------------------- GRADIO APP -------------------- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Welcome to News Retrieval and Sentiment Analyzer App a.k.a InfoMood Tracker") | |
gr.Markdown("## Best tracker for your News around the globe ! ") | |
with gr.Accordion("Steps to run the App"): | |
gr.Markdown("1. Select the Domain from which you want to retrieve the news") | |
gr.Markdown("2. Click on the `Retrieve news` to retrieve the news from the domain. You Should see that the result displayed in the form of Table") | |
gr.Markdown("3. Click on the `Analyze Sentiment` to analyze the sentiments of the news retrieved.") | |
gr.Markdown("4. Select the radio button `percentage_plot` or `word_count_plot`. Click on the `Vizualize data` to view the respective Vizualization. If needed click the `Clear` Button to clear the plot ") | |
gr.Markdown("NOTE: Each depends on the file saved the it's previous step, so the sequence is important. For example, you can't get the data viz until and unless you have the Sentiment Analyzed File ") | |
gr.Markdown("* App Link(Curernt page) : [Hugging Face Spaces link](https://huggingface.co/spaces/pknayak/sentiement_app)") | |
gr.Markdown("* Documentation : [Notion Link](https://mekongdelta.notion.site/Documentation-for-the-InfoMood-app-e310b4fb371c446daa2405f7efdd5b13)") | |
# GRADIO ROW FOR NEWS COLLECTOR | |
with gr.Row(): | |
with gr.Column(scale=1, min_width=600): | |
ui_domain = gr.Dropdown(["forbes", "bbc", "businessinsider_us"], label="Select Domain") | |
retrieve_button = gr.Button("Retrieve news") | |
df_output = gr.Dataframe(type="pandas",wrap=True,label="News retrieved from the selected domain") | |
retrieve_button.click(call_functions, inputs=ui_domain, outputs=df_output) | |
# GRADIO ROW FOR ANALYSING SENTIMENT | |
with gr.Row(): | |
with gr.Column(scale=1, min_width=600): | |
ui_input = gr.Textbox(value='combined_news_response.csv' , visible=False) | |
view_sentiment_bttn = gr.Button("Analyze Sentiment") | |
df_output = gr.Dataframe(type="pandas",wrap=True, label="News along with Sentiment") | |
view_sentiment_bttn.click(sentiment_analyzer, inputs=ui_input, outputs=df_output) | |
with gr.Row(): | |
with gr.Column(scale=1, min_width=600): | |
ui_plot_type = gr.Radio(label="Plot type", | |
choices=["percentage_plot", "word_count_plot"], | |
value='percentage_plot') | |
data_viz_bt = gr.Button("Vizualize data") | |
plt_output = gr.Plot(label="Data Vizualizer for the News App", show_label=True,) | |
gr.ClearButton(plt_output) | |
data_viz_bt.click(call_data_viz_func, inputs=ui_plot_type, outputs=plt_output) | |
demo.launch(debug=True) |