import numpy as np import pandas as pd from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score import joblib import cv2 from deepface import DeepFace import random import gradio as gr import matplotlib.pyplot as plt from transformers import pipeline from PIL import Image from ntscraper import Nitter import csv def video_sentiment_score(sentiment): sentiment_mapping = { "happy": 1.0, "sad": -1.0, "angry": -1.5, "surprised": 0.5, "neutral": 0.0 } return sentiment_mapping.get(sentiment, 0.0) def text_emotion_score(emotion): emotion_mapping = { "joy": 1.0, "sadness": -1.0, "anger": -1.5, "surprise": 0.5, "neutral": 0.0, "disgust": -1.5, "fear": -1.0 } return emotion_mapping.get(emotion, 0.0) def environment_score(environment): environment_mapping = { "Good": 1.0, "Moderate": 0.0, "Bad": -1.0 } return environment_mapping.get(environment, 0.0) # Scraper function def scrape_tweets(hashtag, mode, num_of_tweets, since_date, until_date): print(f"num_of_tweets before conversion: {num_of_tweets}") num_of_tweets = int(num_of_tweets) import httpx httpx._config.DEFAULT_TIMEOUT = httpx.Timeout(3.0) scraper = Nitter() tweets = scraper.get_tweets( hashtag, mode='hashtag', number=num_of_tweets, since=since_date, until=until_date ) final_tweets = [] with open('tweets_kuru.csv', 'w', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow([f'Scraping Tweets for #{hashtag}']) writer.writerow(['User', 'Username', 'Tweet', 'Date']) for tweet in tweets['tweets']: tweet_details = [tweet['user']['name'], tweet['user']['username'], tweet['text'], tweet['date']] writer.writerow([tweet['user']['name'], tweet['user']['username'], tweet['text'], tweet['date']]) final_tweets.append(tweet_details) tweet_df = pd.DataFrame(final_tweets, columns=['User', 'Username', 'Tweet', 'Date']) return tweet_df # Sensor Simulate Data np.random.seed(42) data_size = 1000 aqi_values = np.random.randint(0, 500, size=data_size) noise_levels = np.random.randint(30, 110, size=data_size) temperatures = np.random.randint(-10, 40, size=data_size) humidity_levels = np.random.randint(10, 90, size=data_size) pm25_values = np.random.randint(0, 500, size=data_size) co2_levels = np.random.randint(250, 6000, size=data_size) def classify_environment(aqi, noise, temp, humidity, pm25, co2): if aqi > 150 or noise > 80 or temp > 35 or humidity > 80 or pm25 > 55 or co2 > 2000: return "Bad" elif aqi > 100 or noise > 60 or temp > 30 or humidity > 60 or pm25 > 35 or co2 > 1000: return "Moderate" else: return "Good" labels = [classify_environment(aqi, noise, temp, humidity, pm25, co2) for aqi, noise, temp, humidity, pm25, co2 in zip(aqi_values, noise_levels, temperatures, humidity_levels, pm25_values, co2_levels)] data = pd.DataFrame({ 'AQI': aqi_values, 'Noise': noise_levels, 'Temperature': temperatures, 'Humidity': humidity_levels, 'PM2.5': pm25_values, 'CO2': co2_levels, 'Label': labels }) # Train a Simple Classification Model X = data[['AQI', 'Noise', 'Temperature', 'Humidity', 'PM2.5', 'CO2']] y = data['Label'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) model = RandomForestClassifier() model.fit(X_train, y_train) predictions = model.predict(X_test) print("Accuracy:", accuracy_score(y_test, predictions)) # Save the model joblib.dump(model, 'environment_model.pkl') # Function to analyze video sentiment def analyze_video_sentiment(video_path, num_frames=10, detector_backend='retinaface'): cap = cv2.VideoCapture(video_path) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Select num_frames random frame indices frame_indices = random.sample(range(total_frames), num_frames) emotions = {"happy": 0, "sad": 0, "angry": 0, "surprised": 0, "neutral": 0} frame_images = [] for idx in frame_indices: cap.set(cv2.CAP_PROP_POS_FRAMES, idx) ret, frame = cap.read() if not ret: continue # Convert to RGB rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Face detection and emotion analysis try: results = DeepFace.analyze(rgb_frame, actions=["emotion"], enforce_detection=True, detector_backend=detector_backend) for result in results: if result is None or result == {}: continue # Draw bounding box face_coordinates = result["region"] x1, y1, x2, y2 = face_coordinates["x"], face_coordinates["y"], face_coordinates["x"] + face_coordinates[ "w"], face_coordinates["y"] + face_coordinates["h"] cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) # Add emotion label above the bounding box dominant_emotion = result["dominant_emotion"] cv2.putText(frame, dominant_emotion, (x1 + 5, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) # Update emotion counts if dominant_emotion in emotions: emotions[dominant_emotion] += 1 except ValueError as e: if "No face detected" in str(e): continue else: raise e # Convert frame to image for Gradio display frame_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) frame_images.append(frame_image) cap.release() cv2.destroyAllWindows() # Determine dominant emotion dominant_emotion = max(emotions, key=emotions.get) # Return dominant emotion and frame images return dominant_emotion, frame_images # Load the classifier for sentiment analysis classifier = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base", return_all_scores=True) def classify_tweets(tweets_df): if tweets_df.empty: return "No tweets to analyze." tweet_texts = tweets_df['Tweet'].tolist() results = classifier(tweet_texts) emotions = [max(result, key=lambda x: x['score'])['label'] for result in results] tweet_df = tweets_df.copy() tweet_df['Sentiment'] = emotions # Plot sentiment distribution sentiment_counts = tweet_df['Sentiment'].value_counts() fig, ax = plt.subplots(figsize=(8, 5)) bars = ax.bar(sentiment_counts.index, sentiment_counts.values, color=['#FF6F61', '#6B5B95', '#88B04B', '#F7CAC9', '#92A8D1', '#955251']) ax.set_xlabel('Sentiment', fontsize=14, fontweight='bold', color='#34495E') ax.set_ylabel('Count', fontsize=14, fontweight='bold', color='#34495E') ax.set_title('Tweet Sentiment Distribution', fontsize=18, fontweight='bold', color='#2E4053') ax.tick_params(axis='x', rotation=0, colors='#34495E', labelsize=12) ax.tick_params(axis='y', colors='#34495E', labelsize=12) ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.spines['left'].set_visible(False) ax.spines['bottom'].set_visible(False) ax.yaxis.grid(True, linestyle='--', which='major', color='grey', alpha=.45) ax.xaxis.set_tick_params(width=0) for bar in bars: yval = bar.get_height() ax.text(bar.get_x() + bar.get_width() / 2, yval + 0.01, round(yval, 2), ha='center', va='bottom', color='#34495E', fontsize=12, fontweight='bold') plt.tight_layout() return tweet_df, fig def classify_and_plot(hashtag, mode, num_of_tweets, since_date, until_date): tweet_df = scrape_tweets(hashtag, mode, num_of_tweets, since_date, until_date) tweet_df, fig = classify_tweets(tweet_df) return tweet_df, fig # Function to classify overall sentiment def classify_overall_sentiment(video, hashtag, mode, num_of_tweets, since_date, until_date, aqi, noise, temp, humidity, pm25, co2): # Video Sentiment Analysis video_sentiment, frame_images = analyze_video_sentiment(video) # Social Media Sentiment Analysis tweet_df, plot = classify_and_plot(hashtag, mode, num_of_tweets, since_date, until_date) text_emotion = tweet_df['Sentiment'].value_counts().idxmax() if not tweet_df.empty else "No text analyzed" # Environment Sentiment Analysis environment = classify_environment(aqi, noise, temp, humidity, pm25, co2) # Calculate overall sentiment and create a plot overall_sentiment = f"Video Sentiment: {video_sentiment}, Environment Sentiment: {environment}, Text Emotion: {text_emotion}" sentiments = ["Video Sentiment", "Text Emotion", "Environment Sentiment"] scores = [video_sentiment_score(video_sentiment), text_emotion_score(text_emotion), environment_score(environment)] fig, ax = plt.subplots() ax.plot(sentiments, scores, marker='o') ax.set_xlabel('Sentiment Source', fontsize=14, fontweight='bold', color='#34495E') ax.set_ylabel('Sentiment Score', fontsize=14, fontweight='bold', color='#34495E') ax.set_title('Overall Sentiment Scores', fontsize=18, fontweight='bold', color='#2E4053') plt.tight_layout() return overall_sentiment, frame_images, tweet_df, plot, fig # Create Gradio Interfaces example_video = "TimesSquare.mp4" #hachaudhfikadjsfbhisuadbikahus video_interface = gr.Interface( fn=analyze_video_sentiment, inputs=[ gr.Video(value=example_video), # Adding the example video here gr.Slider(minimum=1, maximum=20, step=1), gr.Radio(["retinaface", "mtcnn", "opencv", "ssd", "dlib", "mediapipe"], label="Detector Backend", value="retinaface") ], outputs=["text", gr.Gallery(label="Analyzed Frames")], title="Video Sentiment Analysis", ) text_interface = gr.Interface( fn=classify_and_plot, inputs=[gr.Textbox(label="Hashtag"), gr.Radio(["latest", "top"], label="Mode"), gr.Slider(1, 1000, step=1, label="Number of Tweets"), gr.Textbox(label="Since Date (YYYY-MM-DD)"), gr.Textbox(label="Until Date (YYYY-MM-DD)")], outputs=[gr.DataFrame(label="Scraped Tweets"), gr.Plot()], title="Social Media Sentiment Analysis" ) environment_interface = gr.Interface( fn=classify_environment, inputs=[gr.Slider(minimum=0, maximum=500, step=1, label="AQI"), gr.Slider(minimum=0, maximum=110, step=1, label="Noise"), gr.Slider(minimum=-10, maximum=50, step=1, label="Temperature"), gr.Slider(minimum=0, maximum=100, step=1, label="Humidity"), gr.Slider(minimum=0, maximum=500, step=1, label="PM2.5"), gr.Slider(minimum=250, maximum=6000, step=1, label="CO2")], outputs="text", title="Environment Sentiment Analysis" ) # The overall_interface # Update the overall_interface overall_interface = gr.Interface( fn=classify_overall_sentiment, inputs=[gr.Video(), gr.Textbox(label="Hashtag"), gr.Radio(["latest", "top"], label="Mode"), gr.Slider(1, 1000, step=1, label="Number of Tweets"), gr.Textbox(label="Since Date (YYYY-MM-DD)"), gr.Textbox(label="Until Date (YYYY-MM-DD)"), gr.Slider(minimum=0, maximum=500, step=1, label="AQI"), gr.Slider(minimum=0, maximum=110, step=1, label="Noise"), gr.Slider(minimum=-10, maximum=50, step=1, label="Temperature"), gr.Slider(minimum=0, maximum=100, step=1, label="Humidity"), gr.Slider(minimum=0, maximum=500, step=1, label="PM2.5"), gr.Slider(minimum=250, maximum=6000, step=1, label="CO2")], outputs=["text", gr.Gallery(), gr.DataFrame(), gr.Plot(), gr.Plot()], title="Overall Sentiment Analysis" ) scraper_interface = gr.Interface( fn=scrape_tweets, inputs=[gr.Textbox(label="Hashtag"), gr.Radio(["latest", "top"], label="Mode"), gr.Slider(1, 1000, step=1, label="Number of Tweets"), gr.Textbox(label="Since Date (YYYY-MM-DD)"), gr.Textbox(label="Until Date (YYYY-MM-DD)")], outputs=gr.DataFrame(), title="Scrape Tweets" ) # Combine Interfaces into Tabbed Layout tabbed_interface = gr.TabbedInterface( [video_interface, text_interface, environment_interface, overall_interface, scraper_interface], ["Video Sentiment Analysis", "Social Media Sentiment Analysis", "Environment Sentiment Analysis", "Overall Sentiment Analysis", "Scrape Tweets"]) # Launch the Interface tabbed_interface.launch(debug=True, share=True)