Spaces:
Runtime error
Runtime error
import gradio as gr | |
from newsdataapi import NewsDataApiClient | |
import os | |
import json | |
import pandas as pd | |
# -----imports for Sentiment Analyzer | |
import re | |
from sklearn.pipeline import Pipeline | |
from sklearn.base import BaseEstimator, TransformerMixin | |
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer | |
import nltk | |
nltk.download('stopwords') | |
from nltk.corpus import stopwords | |
nltk.download('rslp') | |
from nltk.stem import RSLPStemmer | |
import joblib | |
#-------------------------------------------------------------------------------------- | |
#------------------------ NEWS DATA RETRIEVER------------------------------------------ | |
#-------------------------------------------------------------------------------------- | |
def creating_data_dir(directory_path): | |
# Use the os.makedirs() function to create the directory | |
# The 'exist_ok=True' argument allows it to run without errors if the directory already exists | |
os.makedirs(directory_path, exist_ok=True) | |
# Check if the directory was created successfully | |
if os.path.exists(directory_path): | |
print(f"Directory '{directory_path}' created successfully.") | |
else: | |
print(f"Failed to create directory '{directory_path}'.") | |
def retrieve_news_per_keyword(api, keywords, domain): | |
selected_domain = domain | |
selected_domain_url = domain_dict[domain] | |
for keyword in keywords: | |
# print(f"{api} \n {keyword}") | |
# response = api.news_api( q= keyword , country = "us", language = 'en', full_content = True) | |
response = api.news_api( | |
# domain=['bbc', 'forbes' , 'businessinsider_us'], # 'bbc', 'forbes' , 'businessinsider_us', | |
domainurl=['bbc.com', 'forbes.com', 'businessinsider.com'], # 'bbc.com', 'forbes.com', 'businessinsider.com', | |
category='business' , | |
# country = "us", | |
timeframe=48, | |
language = 'en', | |
full_content = True, | |
size=10 | |
) | |
# writing to a file | |
file_path = os.path.join(directory_path, f"response_{keyword}.json") | |
with open(file_path, "w") as outfile: | |
json.dump(response, outfile) | |
print(f"News Response for keyword {keyword} is retrieved") | |
keywords.remove(keyword) | |
def combine_responses_into_one(directory_path): | |
# Use a list comprehension to get all file names in the directory | |
file_list = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))] | |
#retrieve the file_keyword by extracting the string after "_" | |
# Extract the file_keyword from each filename | |
file_keywords = [filename.split('_')[1].split('.')[0] for filename in file_list] | |
# Initialize an empty list to store the combined JSON data | |
combined_json = [] | |
# Loop through each file name | |
for filename in file_list: | |
# Read the current JSON file | |
with open(directory_path+'/'+filename, 'r') as file: | |
current_json = json.load(file) | |
# Extract the file_keyword from the filename | |
file_keyword = filename.split('_')[1].split('.')[0] | |
# Add the file_keyword to each result in the current JSON | |
for result in current_json['results']: | |
result['file_keyword'] = file_keyword | |
# Extend the combined JSON list with the results from the current JSON | |
combined_json.extend(current_json['results']) | |
print(f'{filename} is added to the combined json object') | |
# break # using the break to check the loop code always | |
# Save the combined_json object as a JSON file | |
with open('combined_news_response.json', 'w') as combined_file: | |
json.dump(combined_json, combined_file, indent=4) | |
def convert_json_to_csv(file_name): | |
json_data_df = pd.read_json(file_name) | |
# json_data_df.head() | |
# columns = [ 'title', 'keywords', 'creator', 'description', 'content', 'pubDate', 'country', 'category', 'language', 'file_keyword' ] | |
columns = [ 'title', 'pubDate', 'content', 'country', 'category', 'language' ] | |
csv_file_name = 'combined_news_response.csv' | |
json_data_df[columns].to_csv(csv_file_name) | |
print(f'{csv_file_name} is created') | |
#-------------------------------------First Function called from the UI---------------------------- | |
# API key authorization, Initialize the client with your API key | |
NEWSDATA_API_KEY = "pub_2915202f68e543f70bb9aba9611735142c1fd" | |
keywords = [ "GDP", "CPI", "PPI", "Unemployment Rate", "Interest Rates", "Inflation", "Trade Balance", "Retail Sales", "Manufacturing Index", "Earnings Reports", "Revenue Growth", "Profit Margins", "Earnings Surprises", "Geopolitical Events", "Trade Tensions", "Elections", "Natural Disasters", "Global Health Crises", "Oil Prices", "Gold Prices", "Precious Metals", "Agricultural Commodities", "Federal Reserve", "ECB", "Forex Market", "Exchange Rates", "Currency Pairs", "Tech Company Earnings", "Tech Innovations", "Retail Trends", "Consumer Sentiment", "Financial Regulations", "Government Policies", "Technical Analysis", "Fundamental Analysis", "Cryptocurrency News", "Bitcoin", "Altcoins", "Cryptocurrency Regulations", "S&P 500", "Dow Jones", "NASDAQ", "Market Analysis", "Stock Market Indices" ] | |
domain_dict = {'bbc': 'bbc.com', 'forbes': 'forbes.com', 'businessinsider_us': 'businessinsider.com'} | |
# creating a data directory | |
# Define the directory path you want to create | |
directory_path = './data' | |
def call_functions(domain): | |
creating_data_dir(directory_path) | |
items = os.listdir(directory_path) | |
file_name = './combined_news_response.json' | |
if len(items) == 0: | |
print(f"Directory '{directory_path}' is empty.") | |
api = NewsDataApiClient(apikey=NEWSDATA_API_KEY) | |
retrieve_news_per_keyword(api, keywords, domain) | |
combine_responses_into_one(directory_path) | |
convert_json_to_csv(file_name) | |
elif len(items) >= 2: | |
print(f"Directory '{directory_path}' contains at least two files.") | |
combine_responses_into_one(directory_path) | |
convert_json_to_csv(file_name) | |
else: | |
print(f"Directory '{directory_path}' contains only one file.") | |
# Read the combined CSV file and display the first few rows | |
csv_file_name = "combined_news_response.csv" | |
if os.path.exists(csv_file_name): | |
df = pd.read_csv(csv_file_name) | |
# Assuming df is your DataFrame | |
if 'Unnamed: 0' in df.columns: | |
df.drop('Unnamed: 0', axis=1, inplace=True) | |
first_few_rows = df.head(10) # Adjust the number of rows as needed | |
return first_few_rows | |
else: | |
return f"CSV file '{csv_file_name}' not found." | |
#-------------------------------------------------------------------------------------- | |
#------------------------ SENTIMENT ANALYZER------------------------------------------ | |
#-------------------------------------------------------------------------------------- | |
#---------------- Data Prepocessing ---------- | |
def re_breakline(text_list): | |
return [re.sub('[\n\r]', ' ', r) for r in text_list] | |
def re_hyperlinks(text_list): | |
# Applying regex | |
pattern = 'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' | |
return [re.sub(pattern, ' link ', r) for r in text_list] | |
def re_dates(text_list): | |
# Applying regex | |
pattern = '([0-2][0-9]|(3)[0-1])(\/|\.)(((0)[0-9])|((1)[0-2]))(\/|\.)\d{2,4}' | |
return [re.sub(pattern, ' date ', r) for r in text_list] | |
def re_money(text_list): | |
# Applying regex | |
pattern = '[R]{0,1}\$[ ]{0,}\d+(,|\.)\d+' | |
return [re.sub(pattern, ' paisa ', r) for r in text_list] | |
def re_numbers(text_list): | |
# Applying regex | |
return [re.sub('[0-9]+', ' num ', r) for r in text_list] | |
def re_negation(text_list): | |
# Applying regex | |
return [re.sub('([nN][ãÃaA][oO]|[ñÑ]| [nN] )', ' negate ', r) for r in text_list] | |
def re_special_chars(text_list): | |
# Applying regex | |
return [re.sub('\W', ' ', r) for r in text_list] | |
def re_whitespaces(text_list): | |
# Applying regex | |
white_spaces = [re.sub('\s+', ' ', r) for r in text_list] | |
white_spaces_end = [re.sub('[ \t]+$', '', r) for r in white_spaces] | |
return white_spaces_end | |
def stopwords_removal(text, cached_stopwords=stopwords.words('english')): | |
return [c.lower() for c in text.split() if c.lower() not in cached_stopwords] | |
def stemming_process(text, stemmer=RSLPStemmer()): | |
return [stemmer.stem(c) for c in text.split()] | |
# Get English stopwords | |
en_stopwords = stopwords.words('english') | |
class ApplyRegex(BaseEstimator, TransformerMixin): | |
def __init__(self, regex_transformers): | |
self.regex_transformers = regex_transformers | |
def fit(self, X, y=None): | |
return self | |
def transform(self, X, y=None): | |
# Applying all regex functions in the regex_transformers dictionary | |
for regex_name, regex_function in self.regex_transformers.items(): | |
X = regex_function(X) | |
return X | |
# Class for stopwords removal from the corpus | |
class StopWordsRemoval(BaseEstimator, TransformerMixin): | |
def __init__(self, text_stopwords): | |
self.text_stopwords = text_stopwords | |
def fit(self, X, y=None): | |
return self | |
def transform(self, X, y=None): | |
return [' '.join(stopwords_removal(comment, self.text_stopwords)) for comment in X] | |
# Class for apply the stemming process | |
class StemmingProcess(BaseEstimator, TransformerMixin): | |
def __init__(self, stemmer): | |
self.stemmer = stemmer | |
def fit(self, X, y=None): | |
return self | |
def transform(self, X, y=None): | |
return [' '.join(stemming_process(comment, self.stemmer)) for comment in X] | |
# Class for extracting features from corpus | |
class TextFeatureExtraction(BaseEstimator, TransformerMixin): | |
def __init__(self, vectorizer): | |
self.vectorizer = vectorizer | |
def fit(self, X, y=None): | |
return self | |
def transform(self, X, y=None): | |
return self.vectorizer.fit_transform(X).toarray() | |
#----------------------------Creating Pipeline for Preparing the data----- | |
# Defining regex transformers to be applied | |
regex_transformers = { | |
'break_line': re_breakline, | |
'hiperlinks': re_hyperlinks, | |
'dates': re_dates, | |
'money': re_money, | |
'numbers': re_numbers, | |
'negation': re_negation, | |
'special_chars': re_special_chars, | |
'whitespaces': re_whitespaces | |
} | |
# Defining the vectorizer to extract features from text | |
vectorizer = TfidfVectorizer(max_features=300, min_df=7, max_df=0.8, stop_words=en_stopwords) | |
# Building the Pipeline | |
text_pipeline = Pipeline([ | |
('regex', ApplyRegex(regex_transformers)), | |
('stopwords', StopWordsRemoval(stopwords.words('portuguese'))), | |
('stemming', StemmingProcess(RSLPStemmer())), | |
('text_features', TextFeatureExtraction(vectorizer)) | |
]) | |
#----------------- Analyzing the Sentiments of whole dataset------- | |
def sentiment_analyzer(csv_file_name='combined_news_response.csv'): | |
df = pd.read_csv(csv_file_name) | |
df.drop('Unnamed: 0',axis=1,inplace=True) | |
# Splitting into X and y | |
X = list(df['content'].values) | |
# Applying the pipeline | |
X_processed = text_pipeline.fit_transform(X) | |
# Load a saved model | |
loaded_model_nb = joblib.load("Naive Bayes_model.joblib") | |
# Use the loaded model for inference | |
loaded_predictions_nb = loaded_model_nb.predict(X_processed) | |
sentiments = loaded_predictions_nb | |
# Sentiment mapping | |
sentiment_mapping = {0: 'negative', 1: 'neutral', 2: 'positive'} | |
print(f"df['content'].values ==> {len(df['content'].values)} \n sentiments length ==> {len(sentiments)}") | |
# Create a DataFrame | |
sentiment_df = pd.DataFrame({ | |
'content': df['content'].values, | |
'sentiment': [sentiment_mapping[sent] for sent in sentiments] | |
}) | |
return sentiment_df | |
# Creating the app for both | |
with gr.Blocks() as demo: | |
gr.Markdown("# Welcome to News Retrieval and Sentiment Analyzer App a.k.a InfoMood Tracker") | |
gr.Markdown("## Best tracker for your News around the globe ! ") | |
with gr.Accordion("Steps to run the App"): | |
gr.Markdown("1. Select the Domain from which you want to retrieve the news") | |
gr.Markdown("2. Click on the `Retrieve news` to retrieve the news from the domain. You Should see that the result displayed in the form of Table") | |
gr.Markdown("3. Click on the `Analyze Sentiment` to analyze the sentiments of the news retrieved.") | |
# gr.Markdown("4. ") | |
with gr.Row(): | |
with gr.Column(scale=1, min_width=600): | |
ui_domain = gr.Dropdown(["bbc", "forbes", "businessinsider_us"], label="Select Domain") | |
retrieve_button = gr.Button("Retrieve news") | |
df_output = gr.Dataframe(type="pandas",wrap=True,label="News retrieved from the selected domain") | |
retrieve_button.click(call_functions, inputs=ui_domain, outputs=df_output) | |
with gr.Row(): | |
with gr.Column(scale=1, min_width=600): | |
ui_input = gr.Textbox(value='combined_news_response.csv' , visible=False) | |
view_sentiment_bttn = gr.Button("Analyze Sentiment") | |
df_output = gr.Dataframe(type="pandas",wrap=True, label="News along with Sentiment") | |
view_sentiment_bttn.click(sentiment_analyzer, inputs=ui_input, outputs=df_output) | |
demo.launch(debug=True) |