Spaces:
Runtime error
Runtime error
import tensorflow as tf | |
import numpy as np | |
import pandas as pd | |
import swifter | |
import json | |
import re | |
import requests | |
import time | |
from keras.preprocessing.text import Tokenizer | |
from sklearn.preprocessing import OneHotEncoder, LabelEncoder | |
from sklearn.model_selection import train_test_split | |
from sklearn.metrics import classification_report | |
# from keras.optimizers.optimizer_v2.rmsprop import RMSProp | |
from tensorflow.keras.utils import to_categorical | |
from tensorflow.keras.models import Sequential, load_model | |
from tensorflow.keras.layers import Dense, Conv1D, Embedding, MaxPooling1D, GlobalMaxPooling1D, GlobalAveragePooling1D, SpatialDropout1D, LSTM, Dropout, SimpleRNN, Bidirectional, Attention, Activation, GRU, TextVectorization, Input | |
from tensorflow.keras.callbacks import EarlyStopping | |
from tensorflow.keras.preprocessing.sequence import pad_sequences | |
import arabicstopwords.arabicstopwords as stp | |
from nltk.stem.isri import ISRIStemmer | |
from pyarabic.araby import strip_tashkeel, strip_tatweel | |
from huggingface_hub import from_pretrained_keras | |
from collections import Counter | |
from fastapi import FastAPI, Request, HTTPException | |
import firebase_admin | |
from firebase_admin import credentials | |
from firebase_admin import firestore | |
import threading | |
# Import the Firebase Admin SDK | |
import firebase_admin | |
from firebase_admin import credentials | |
from firebase_admin import firestore | |
from transformers import BertTokenizer, AutoModelForSeq2SeqLM, pipeline | |
from arabert.preprocess import ArabertPreprocessor | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
import re | |
import smtplib | |
from email.mime.text import MIMEText | |
import os | |
import math | |
import random | |
import threading | |
import Classes | |
# Firebase ininlaziton | |
cred = credentials.Certificate( | |
"text-to-emotions-firebase-adminsdk-8isbn-dffbdf01e8.json") | |
firebase_admin.initialize_app(cred) | |
db = firestore.client() | |
# Model summury | |
model_name="abdalrahmanshahrour/auto-arabic-summarization" | |
preprocessor = ArabertPreprocessor(model_name="") | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
modelsummary =AutoModelForSeq2SeqLM.from_pretrained(model_name) | |
pipeline1 = pipeline("text2text-generation",model=modelsummary,tokenizer=tokenizer) | |
# Model inilization | |
isristemmer = ISRIStemmer() | |
model = from_pretrained_keras('MahmoudNasser/GRU-MODEL-EMOTION-AR-TEXT-76jP') | |
# dictinarties for email OTP | |
emailOTP={} | |
def stemming(txt): | |
return isristemmer.stem(txt) | |
def remove_singleCharacter(text): | |
text_tokenized = ar.tokenize(text) | |
clean_txt = '' | |
for word in text_tokenized: | |
if len(word) != 1: | |
clean_txt = clean_txt + word + ' ' | |
return clean_txt[:-1] | |
# remove_punctuations | |
def remove_punctuations(text): | |
punc = '''()-[]{};:'"\,<>./@#$%^&*،؛_~''' | |
arabic_punctuations = '''`÷×؛_ـ،/:".,'~¦+|”…“–ـ=﴾﴿ ﹱ ﹹ ⸀˓• ב''' | |
punctuations_list = punc + arabic_punctuations | |
for x in punctuations_list: | |
text = text.replace(x, ' ') | |
return text | |
def normalize_text(txt): | |
txt = strip_tashkeel(txt) | |
txt = strip_tatweel(txt) | |
txt = ''.join(txt[i] for i in range(len(txt)) if i == | |
0 or txt[i-1] != txt[i]) # remove repeated characters | |
return txt | |
def remove_stopwords(txt, path="stopword.txt"): | |
text_tokenized = txt.split(' ') | |
clean_txt = '' | |
# useful_words=[] | |
# filtered_sentence=" " | |
arabic_stop_words_file = open(path, 'r', encoding='utf-8') | |
arabic_stop_words = arabic_stop_words_file.read().split('\n') | |
for word in text_tokenized: | |
if word not in arabic_stop_words: | |
clean_txt = clean_txt + word + ' ' | |
return clean_txt[:-1] | |
def Remove_unwanted(text): | |
# removing the extra spacing and links | |
text = re.sub(r'^https?:\/\/.*[\r\n]*', ' ', text, flags=re.MULTILINE) | |
text = re.sub(r'^http?:\/\/.*[\r\n]*', ' ', text, flags=re.MULTILINE) | |
text = re.sub(r"http\S+", " ", text) | |
text = re.sub(r"https\S+", " ", text) | |
text = re.sub(r'\s+', ' ', text) | |
text = re.sub(r'[a-zA-Z]+', ' ', text) | |
text = re.sub(r"^\s+|\s+$", "", text) | |
text = re.sub(r"(\s\d+)", " ", text) | |
text = re.sub(r"$\d+\W+|\b\d+\b|\W+\d+$", " ", text) | |
text = re.sub(r"\d+", " ", text) | |
text = re.sub(r'[إأٱآا]', 'ا', text) | |
text = re.sub(r'ى', '[ي]', text) | |
text = re.sub(r'ء', '[ؤئ]', text) | |
text = re.sub(r' +', ' ', text) | |
return text | |
def txt_preprocess(text): | |
text = normalize_text(text) | |
text = stemming(text) | |
text = remove_stopwords(text) | |
text = remove_punctuations(text) | |
text = Remove_unwanted(text) | |
return text | |
def see_if_thereupdates(): | |
f = open("updates.txt", "r") | |
return f.readline() | |
def getmodel(): | |
m = from_pretrained_keras('MahmoudNasser/GRU-MODEL-EMOTION-AR-TEXT-72P') | |
return m | |
def original_values(num): | |
if num == 0: | |
return 'anger' | |
elif num == 1: | |
return 'sadness' | |
elif num == 2: | |
return 'joy' | |
elif num == 3: | |
return 'surprise' | |
elif num == 4: | |
return 'love' | |
elif num == 5: | |
return 'sympathy' | |
elif num == 6: | |
return 'fear' | |
def modelsummary(data): | |
result = pipeline1(data, | |
pad_token_id= tokenizer.eos_token_id, | |
num_beams=4, | |
repetition_penalty=3.0, | |
max_length=600, | |
length_penalty=.50, | |
no_repeat_ngram_size = 3)[0]['generated_text'] | |
result = remove_punctuations(result) | |
return { 'summary':result} | |
def modelpredict(data): | |
data = txt_preprocess(data) | |
pred = model.predict(pd.Series([data])) | |
return {'anger': float(pred[0][0]), 'sadness': float(pred[0][1]), 'joy': float(pred[0][2]), 'surprise': float(pred[0][3]), | |
'love': float(pred[0][4]), 'sympathy': float(pred[0][5]), 'fear': float(pred[0][6])} | |
# return {"anger": .90, "happy": .02, "emotionlabel": "anger"} | |
#OTP code | |
def genereteotp (email): | |
digits = "0123456789" | |
OTP = "" | |
for i in range(6): | |
OTP += digits[math.floor(random.random()*10)] | |
emailOTP[email]=OTP | |
otp = "your otp is "+OTP | |
print("---------------------****---------------------------------------") | |
print(email) | |
print(otp) | |
s = smtplib.SMTP('smtp.gmail.com', 587) | |
# start TLS for security | |
s.starttls() | |
# Authentication | |
s.login("youssefmk1214@gmail.com", "lipnacjbsxmjpjxm") | |
# message to be sent | |
message = otp | |
# instance of MIMEText | |
msg = MIMEText(message) | |
# sender's email address | |
msg['From'] = "youssefmk1214@gmail.com" | |
# recipient's email address | |
msg['To'] = email | |
# subject of the email | |
msg['Subject'] = " Shakwa Textual OTP" | |
print("here to send messege") | |
# send the message via SMTP server | |
s.sendmail(msg['From'], msg['To'], msg.as_string()) | |
# terminate the SMTP session | |
s.quit() | |
#Threading functions | |
queuedUnSummurizedShawkas = [] | |
semphoreShakwas=threading.Semaphore(0) | |
def summrizedshakwas(): | |
global queuedUnSummurizedShawkas | |
global semphoreShakwas | |
global db | |
while True: | |
semphoreShakwas.acquire() | |
shawka=queuedUnSummurizedShawkas.pop(0) | |
tmpdict= modelsummary(shawka.complaintbody) | |
print(tmpdict) | |
shawka.summary=tmpdict['summary'] | |
shawka.set_data(db) | |
thread = threading.Thread(target=summrizedshakwas) | |
thread.start() | |
#lithening to changes of documnts | |
callback_done = threading.Event() | |
def on_snapshot(doc_snapshot, changes, read_time): | |
for doc in doc_snapshot: | |
# print(doc.to_dict()) | |
shakw = Classes.Shakwa.from_dict(source=doc.to_dict()) | |
shakw.complaintbody | |
if shakw.summary==None: | |
global queuedUnSummurizedShawkas | |
global semphoreShakwas | |
queuedUnSummurizedShawkas.append(shakw) | |
semphoreShakwas.release() | |
callback_done.set() | |
docwatch= db.collection("complaints").on_snapshot(on_snapshot,) | |
# Main Server inilization | |
app = FastAPI() | |
def index(): | |
return "Hello World" | |
async def read_root(request:Request): | |
json_data = await request.json() | |
if 'text'in json_data: | |
return modelsummary(json_data['text']) | |
else: | |
raise HTTPException(status_code=400, detail="Missing text value") | |
async def read_root(request:Request): | |
json_data = await request.json() | |
if 'email' in json_data: | |
genereteotp(json_data["email"]) | |
return "message was sent succesufully to "+json_data['email'] | |
else: | |
raise HTTPException(status_code=400, detail="Missing email value") | |
async def read_root(request:Request): | |
json_data = await request.json() | |
if 'email' in json_data and 'otpcode' in json_data: | |
if json_data['otpcode'] ==emailOTP[json_data['email']] : | |
return "OTP verified succesufully " | |
else: | |
return "OTP Code is wrong " | |
else: | |
raise HTTPException(status_code=400, detail="Missing email value") | |
async def read_root(request: Request): | |
json_data = await request.json() | |
if "mathod" in json_data and json_data["mathod"] == "emotion_predict" and 'text' in json_data: | |
return modelpredict(json_data["text"]) | |
else: | |
raise HTTPException(status_code=400, detail="Missing mathod value") | |
def getcommonwords(): | |
return {'التسجيل': 23, 'مش': 19, 'تطبيق': 18, 'التطبيق': 18, 'التفعيل': 17, 'كود': 13, 'ارسال': 12, 'تسجيل': 12, 'يتم': 12, 'الرقم': 12} | |