import gradio as gr from newsdataapi import NewsDataApiClient import os import json import pandas as pd # ----------------imports for Sentiment Analyzer---------------------- import re from sklearn.pipeline import Pipeline from sklearn.base import BaseEstimator, TransformerMixin from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer import nltk nltk.download('stopwords') from nltk.corpus import stopwords nltk.download('rslp') from nltk.stem import RSLPStemmer import joblib # --------------------------------imports for Data Vizualisation from wordcloud import WordCloud from collections import Counter import matplotlib.pyplot as plt import seaborn as sns from matplotlib.gridspec import GridSpec import plotly.offline as py import plotly.express as px import plotly.graph_objs as go #-------------------------------------------------------------------------------------- #------------------------ NEWS DATA RETRIEVER------------------------------------------ #-------------------------------------------------------------------------------------- def creating_data_dir(directory_path): # Use the os.makedirs() function to create the directory # The 'exist_ok=True' argument allows it to run without errors if the directory already exists os.makedirs(directory_path, exist_ok=True) # Check if the directory was created successfully if os.path.exists(directory_path): print(f"Directory '{directory_path}' created successfully.") else: print(f"Failed to create directory '{directory_path}'.") def retrieve_news_per_keyword(api, keywords, domain): selected_domain = domain selected_domain_url = domain_dict[domain] for keyword in keywords: # print(f"{api} \n {keyword}") # response = api.news_api( q= keyword , country = "us", language = 'en', full_content = True) response = api.news_api( # domain=['bbc', 'forbes' , 'businessinsider_us'], # 'bbc', 'forbes' , 'businessinsider_us', domainurl=selected_domain_url, # 'bbc.com', 'forbes.com', 'businessinsider.com', category=['business','technology' , 'politics'] , # country = "us", timeframe=48, language = 'en', full_content = True, size=10 ) # writing to a file file_path = os.path.join(directory_path, f"response_{keyword}.json") with open(file_path, "w") as outfile: json.dump(response, outfile) print(f"News Response for keyword {keyword} is retrieved") keywords.remove(keyword) def combine_responses_into_one(directory_path): # Use a list comprehension to get all file names in the directory file_list = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))] #retrieve the file_keyword by extracting the string after "_" # Extract the file_keyword from each filename file_keywords = [filename.split('_')[1].split('.')[0] for filename in file_list] # Initialize an empty list to store the combined JSON data combined_json = [] # Loop through each file name for filename in file_list: # Read the current JSON file with open(directory_path+'/'+filename, 'r') as file: current_json = json.load(file) # Extract the file_keyword from the filename file_keyword = filename.split('_')[1].split('.')[0] # Add the file_keyword to each result in the current JSON for result in current_json['results']: result['file_keyword'] = file_keyword # Extend the combined JSON list with the results from the current JSON combined_json.extend(current_json['results']) print(f'{filename} is added to the combined json object') # break # using the break to check the loop code always # Save the combined_json object as a JSON file with open('combined_news_response.json', 'w') as combined_file: json.dump(combined_json, combined_file, indent=4) def convert_json_to_csv(file_name): json_data_df = pd.read_json(file_name) json_data_df.head() # columns = [ 'title', 'keywords', 'creator', 'description', 'content', 'pubDate', 'country', 'category', 'language', 'file_keyword' ] columns = [ 'title', 'pubDate', 'content', 'country', 'category', 'language' ] csv_file_name = 'combined_news_response.csv' json_data_df[columns].to_csv(csv_file_name) print(f'{csv_file_name} is created') #-------------------------------------First Function called from the UI---------------------------- # API key authorization, Initialize the client with your API key NEWSDATA_API_KEY = "pub_2915202f68e543f70bb9aba9611735142c1fd" keywords = [ "GDP", "CPI", "PPI", "Unemployment Rate", "Interest Rates", "Inflation", "Trade Balance", "Retail Sales", "Manufacturing Index", "Earnings Reports", "Revenue Growth", "Profit Margins", "Earnings Surprises", "Geopolitical Events", "Trade Tensions", "Elections", "Natural Disasters", "Global Health Crises", "Oil Prices", "Gold Prices", "Precious Metals", "Agricultural Commodities", "Federal Reserve", "ECB", "Forex Market", "Exchange Rates", "Currency Pairs", "Tech Company Earnings", "Tech Innovations", "Retail Trends", "Consumer Sentiment", "Financial Regulations", "Government Policies", "Technical Analysis", "Fundamental Analysis", "Cryptocurrency News", "Bitcoin", "Altcoins", "Cryptocurrency Regulations", "S&P 500", "Dow Jones", "NASDAQ", "Market Analysis", "Stock Market Indices" ] domain_dict = {'bbc': 'bbc.com', 'forbes': 'forbes.com', 'businessinsider_us': 'businessinsider.com'} # creating a data directory # Define the directory path you want to create directory_path = './data' def call_functions(domain): creating_data_dir(directory_path) items = os.listdir(directory_path) file_name = './combined_news_response.json' if len(items) == 0: print(f"Directory '{directory_path}' is empty.") api = NewsDataApiClient(apikey=NEWSDATA_API_KEY) retrieve_news_per_keyword(api, keywords, domain) combine_responses_into_one(directory_path) convert_json_to_csv(file_name) elif len(items) >= 2: print(f"Directory '{directory_path}' contains at least two files.") combine_responses_into_one(directory_path) convert_json_to_csv(file_name) else: print(f"Directory '{directory_path}' contains only one file.") # Read the combined CSV file and display the first few rows csv_file_name = "combined_news_response.csv" if os.path.exists(csv_file_name): df = pd.read_csv(csv_file_name) # Assuming df is your DataFrame if 'Unnamed: 0' in df.columns: df.drop('Unnamed: 0', axis=1, inplace=True) first_few_rows = df.head(10) # Adjust the number of rows as needed return first_few_rows else: return f"CSV file '{csv_file_name}' not found." #-------------------------------------------------------------------------------------- #------------------------ SENTIMENT ANALYZER------------------------------------------ #-------------------------------------------------------------------------------------- # Get English stopwords en_stopwords = stopwords.words('english') #---------------- Data Prepocessing ---------- def re_breakline(text_list): return [re.sub('[\n\r]', ' ', r) for r in text_list] def re_hyperlinks(text_list): # Applying regex pattern = 'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' return [re.sub(pattern, ' link ', r) for r in text_list] def re_dates(text_list): # Applying regex pattern = '([0-2][0-9]|(3)[0-1])(\/|\.)(((0)[0-9])|((1)[0-2]))(\/|\.)\d{2,4}' return [re.sub(pattern, ' date ', r) for r in text_list] def re_money(text_list): # Applying regex pattern = '[R]{0,1}\$[ ]{0,}\d+(,|\.)\d+' return [re.sub(pattern, ' paisa ', r) for r in text_list] def re_numbers(text_list): # Applying regex return [re.sub('[0-9]+', ' num ', r) for r in text_list] def re_negation(text_list): # Applying regex return [re.sub('([nN][ãÃaA][oO]|[ñÑ]| [nN] )', ' negate ', r) for r in text_list] def re_special_chars(text_list): # Applying regex return [re.sub('\W', ' ', r) for r in text_list] def re_whitespaces(text_list): # Applying regex white_spaces = [re.sub('\s+', ' ', r) for r in text_list] white_spaces_end = [re.sub('[ \t]+$', '', r) for r in white_spaces] return white_spaces_end def stopwords_removal(text, cached_stopwords=en_stopwords): return [c.lower() for c in text.split() if c.lower() not in cached_stopwords] def stemming_process(text, stemmer=RSLPStemmer()): return [stemmer.stem(c) for c in text.split()] class ApplyRegex(BaseEstimator, TransformerMixin): def __init__(self, regex_transformers): self.regex_transformers = regex_transformers def fit(self, X, y=None): return self def transform(self, X, y=None): # Applying all regex functions in the regex_transformers dictionary for regex_name, regex_function in self.regex_transformers.items(): X = regex_function(X) return X # Class for stopwords removal from the corpus class StopWordsRemoval(BaseEstimator, TransformerMixin): def __init__(self, text_stopwords): self.text_stopwords = text_stopwords def fit(self, X, y=None): return self def transform(self, X, y=None): return [' '.join(stopwords_removal(comment, self.text_stopwords)) for comment in X] # Class for apply the stemming process class StemmingProcess(BaseEstimator, TransformerMixin): def __init__(self, stemmer): self.stemmer = stemmer def fit(self, X, y=None): return self def transform(self, X, y=None): return [' '.join(stemming_process(comment, self.stemmer)) for comment in X] # Class for extracting features from corpus class TextFeatureExtraction(BaseEstimator, TransformerMixin): def __init__(self, vectorizer): self.vectorizer = vectorizer def fit(self, X, y=None): return self def transform(self, X, y=None): return self.vectorizer.fit_transform(X).toarray() #----------------------------Creating Pipeline for Preparing the data----- # Defining regex transformers to be applied regex_transformers = { 'break_line': re_breakline, 'hiperlinks': re_hyperlinks, 'dates': re_dates, 'money': re_money, 'numbers': re_numbers, 'negation': re_negation, 'special_chars': re_special_chars, 'whitespaces': re_whitespaces } # Defining the vectorizer to extract features from text vectorizer = TfidfVectorizer(max_features=300, min_df=7, max_df=0.8, stop_words=en_stopwords) # Building the Pipeline text_pipeline = Pipeline([ ('regex', ApplyRegex(regex_transformers)), ('stopwords', StopWordsRemoval(en_stopwords)), ('stemming', StemmingProcess(RSLPStemmer())), ('text_features', TextFeatureExtraction(vectorizer)) ]) #----------------- Analyzing the Sentiments of whole dataset------- def sentiment_analyzer(csv_file_name='combined_news_response.csv'): df = pd.read_csv(csv_file_name) df.drop('Unnamed: 0',axis=1,inplace=True) # Splitting into X and y X = list(df['content'].values) # Applying the pipeline X_processed = text_pipeline.fit_transform(X) # Load a saved model loaded_model_nb = joblib.load("Naive Bayes_model.joblib") # Use the loaded model for inference loaded_predictions_nb = loaded_model_nb.predict(X_processed) sentiments = loaded_predictions_nb # Sentiment mapping sentiment_mapping = {0: 'negative', 1: 'neutral', 2: 'positive'} print(f"df['content'].values ==> {len(df['content'].values)} \n sentiments length ==> {len(sentiments)}") # Create a DataFrame sentiment_df = pd.DataFrame({ 'content': df['content'].values, 'sentiment': [sentiment_mapping[sent] for sent in sentiments] }) sentiment_df.to_csv('sentiment.csv') print(f'Sentiment df saved as "sentiment.csv"') return sentiment_df #---------------------------------------------------------------------------------------------- #----------------------------------DATA VIZUALIZER--------------------------------------------- #---------------------------------------------------------------------------------------------- def get_senti_pct_distribution(expt_df): sentiment_counts = expt_df['sentiment'].value_counts() labels = sentiment_counts.index sizes = sentiment_counts.values colors = ['lightblue', 'limegreen', 'lightcoral'] # Create a pie chart plt.figure(figsize=(8, 8)) plt.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=140) # Equal aspect ratio ensures that pie is drawn as a circle plt.axis('equal') plt.title('Sentiment Distribution for Labelled Data') # plt.show() return plt def preprocessing_data(expt_df): # Creating a list of comment reviews news = list(expt_df['content'].values) # Applying RegEx news_breakline = re_breakline(news) expt_df['re_breakline'] = news_breakline # Applying RegEx news_hyperlinks = re_hyperlinks(news_breakline) expt_df['re_hyperlinks'] = news_hyperlinks # Applying RegEx news_dates = re_dates(news_hyperlinks) expt_df['re_dates'] = news_dates # Applying RegEx news_money = re_money(news_dates) expt_df['re_money'] = news_money # Applying RegEx news_numbers = re_numbers(news_money) expt_df['re_numbers'] = news_numbers # Applying RegEx news_negation = re_negation(news_numbers) expt_df['re_negation'] = news_negation # Applying RegEx news_special_chars = re_special_chars(news_negation) expt_df['re_special_chars'] = news_special_chars # Applying RegEx news_whitespaces = re_whitespaces(news_special_chars) expt_df['re_whitespaces'] = news_whitespaces # Removing stopwords and looking at some examples news_stopwords = [' '.join(stopwords_removal(news)) for news in news_whitespaces] expt_df['stopwords_removed'] = news_stopwords return expt_df def generate_wc(processed_expt_df): # Generating words pos_news = list(processed_expt_df.query('sentiment == "positive"')['stopwords_removed'].values) positive_words = ' '.join(pos_news).split(' ') neg_news = list(processed_expt_df.query('sentiment == "negative"')['stopwords_removed'].values) negative_words = ' '.join(neg_news).split(' ') neu_news = list(processed_expt_df.query('sentiment == "neutral"')['stopwords_removed'].values) neutral_words = ' '.join(neu_news).split(' ') # Using Counter for creating a dictionary counting positive_dict = Counter(positive_words) negative_dict = Counter(negative_words) neutral_dict = Counter(neutral_words) # Generating wordclouds for news positive_wc = WordCloud(width=1280, height=720, collocations=False, random_state=42, # mask=transf_like_mask, colormap='Blues', background_color='white', max_words=50).generate_from_frequencies(positive_dict) negative_wc = WordCloud(width=1280, height=720, collocations=False, random_state=42, # mask=transf_bomb_mask, colormap='Reds', background_color='white', max_words=50).generate_from_frequencies(negative_dict) neutral_wc = WordCloud(width=1280, height=720, collocations=False, random_state=42, # mask=transf_bomb_mask, colormap='Greens', background_color='white', max_words=50).generate_from_frequencies(neutral_dict) return positive_wc, negative_wc, neutral_wc # def plot_news_wc(positive_wc, negative_wc, neutral_wc): # fig, axs = plt.subplots(1, 3, figsize=(20, 20)) # ax1 = axs[0] # ax2 = axs[1] # ax3 = axs[2] # ax1.imshow(positive_wc) # ax1.axis('off') # ax1.set_title('WordCloud for Positive Words in News', size=18, pad=20) # ax2.imshow(negative_wc) # ax2.axis('off') # ax2.set_title('WordCloud for Negative Words in News', size=18, pad=20) # ax3.imshow(neutral_wc) # ax3.axis('off') # ax3.set_title('WordCloud for Neutral Words in News', size=18, pad=20) # return fig def plot_news_wc(positive_wc, negative_wc, neutral_wc): fig, axs = plt.subplots(3, 1, figsize=(10, 30)) # 3 rows, 1 column ax1 = axs[0] ax2 = axs[1] ax3 = axs[2] ax1.imshow(positive_wc) ax1.axis('off') ax1.set_title('WordCloud for Positive Words in News', size=18, pad=20) ax2.imshow(negative_wc) ax2.axis('off') ax2.set_title('WordCloud for Negative Words in News', size=18, pad=20) ax3.imshow(neutral_wc) ax3.axis('off') ax3.set_title('WordCloud for Neutral Words in News', size=18, pad=20) return fig def get_news_wc(expt_df): processed_expt_df = preprocessing_data(expt_df) positive_wc, negative_wc, neutral_wc = generate_wc(processed_expt_df) return plot_news_wc(positive_wc, negative_wc, neutral_wc) def call_data_viz_func(plot_type): senti_csv_file_name = 'sentiment.csv' expt_df = pd.read_csv(senti_csv_file_name) if plot_type=='percentage_plot': return get_senti_pct_distribution(expt_df) elif plot_type=='word_count_plot': return get_news_wc(expt_df) else: raise ValueError("Unknown plot type selected") #---------------------- GRADIO APP -------------------- with gr.Blocks() as demo: gr.Markdown("# Welcome to News Retrieval and Sentiment Analyzer App a.k.a InfoMood Tracker") gr.Markdown("## Best tracker for your News around the globe ! ") with gr.Accordion("Steps to run the App"): gr.Markdown("1. Select the Domain from which you want to retrieve the news") gr.Markdown("2. Click on the `Retrieve news` to retrieve the news from the domain. You Should see that the result displayed in the form of Table") gr.Markdown("3. Click on the `Analyze Sentiment` to analyze the sentiments of the news retrieved.") gr.Markdown("4. Select the radio button `percentage_plot` or `word_count_plot`. Click on the `Vizualize data` to view the respective Vizualization. If needed click the `Clear` Button to clear the plot ") gr.Markdown("NOTE: Each depends on the file saved the it's previous step, so the sequence is important. For example, you can't get the data viz until and unless you have the Sentiment Analyzed File ") gr.Markdown("* App Link(Curernt page) : [Hugging Face Spaces link](https://huggingface.co/spaces/pknayak/sentiement_app)") gr.Markdown("* Documentation : [Notion Link](https://mekongdelta.notion.site/Documentation-for-the-InfoMood-app-e310b4fb371c446daa2405f7efdd5b13)") # GRADIO ROW FOR NEWS COLLECTOR with gr.Row(): with gr.Column(scale=1, min_width=600): ui_domain = gr.Dropdown(["forbes", "bbc", "businessinsider_us"], label="Select Domain") retrieve_button = gr.Button("Retrieve news") df_output = gr.Dataframe(type="pandas",wrap=True,label="News retrieved from the selected domain") retrieve_button.click(call_functions, inputs=ui_domain, outputs=df_output) # GRADIO ROW FOR ANALYSING SENTIMENT with gr.Row(): with gr.Column(scale=1, min_width=600): ui_input = gr.Textbox(value='combined_news_response.csv' , visible=False) view_sentiment_bttn = gr.Button("Analyze Sentiment") df_output = gr.Dataframe(type="pandas",wrap=True, label="News along with Sentiment") view_sentiment_bttn.click(sentiment_analyzer, inputs=ui_input, outputs=df_output) with gr.Row(): with gr.Column(scale=1, min_width=600): ui_plot_type = gr.Radio(label="Plot type", choices=["percentage_plot", "word_count_plot"], value='percentage_plot') data_viz_bt = gr.Button("Vizualize data") plt_output = gr.Plot(label="Data Vizualizer for the News App", show_label=True,) gr.ClearButton(plt_output) data_viz_bt.click(call_data_viz_func, inputs=ui_plot_type, outputs=plt_output) demo.launch(debug=True)