QGen / app.py
DevBM's picture
using session_id for all different users
5fde50b verified
raw
history blame
19.7 kB
import streamlit as st
from transformers import T5ForConditionalGeneration, T5Tokenizer
import spacy
import nltk
from sklearn.feature_extraction.text import TfidfVectorizer
from rake_nltk import Rake
import pandas as pd
from fpdf import FPDF
import wikipediaapi
from functools import lru_cache
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('brown')
from nltk.tokenize import sent_tokenize
nltk.download('wordnet')
from nltk.corpus import wordnet
import random
import sense2vec
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import json
import os
from sentence_transformers import SentenceTransformer, util
import textstat
from spellchecker import SpellChecker
from transformers import pipeline
import re
import pymupdf
import uuid
print("***************************************************************")
st.set_page_config(
page_title="Question Generator",
initial_sidebar_state="auto",
menu_items={
"About" : "#Hi this our project."
}
)
# Initialize Wikipedia API with a user agent
user_agent = 'QGen/1.0 (channingfisher7@gmail.com)'
wiki_wiki = wikipediaapi.Wikipedia(user_agent= user_agent,language='en')
def get_session_id():
if 'session_id' not in st.session_state:
st.session_state.session_id = str(uuid.uuid4())
return st.session_state.session_id
def initialize_state(session_id):
if 'session_states' not in st.session_state:
st.session_state.session_states = {}
if session_id not in st.session_state.session_states:
st.session_state.session_states[session_id] = {
'generated_questions': [],
# add other state variables as needed
}
return st.session_state.session_states[session_id]
def get_state(session_id):
return st.session_state.session_states[session_id]
def set_state(session_id, key, value):
st.session_state.session_states[session_id][key] = value
@st.cache_resource
def load_model():
model_name = "DevBM/t5-large-squad"
model = T5ForConditionalGeneration.from_pretrained(model_name)
tokenizer = T5Tokenizer.from_pretrained(model_name)
return model, tokenizer
# Load Spacy Model
@st.cache_resource
def load_nlp_models():
nlp = spacy.load("en_core_web_md")
s2v = sense2vec.Sense2Vec().from_disk('s2v_old')
return nlp, s2v
# Load Quality Assurance Models
@st.cache_resource
def load_qa_models():
# Initialize BERT model for sentence similarity
similarity_model = SentenceTransformer('all-MiniLM-L6-v2')
spell = SpellChecker()
return similarity_model, spell
nlp, s2v = load_nlp_models()
model, tokenizer = load_model()
similarity_model, spell = load_qa_models()
context_model = similarity_model
def get_pdf_text(pdf_file):
doc = pymupdf.open(stream=pdf_file.read(), filetype="pdf")
text = ""
for page_num in range(doc.page_count):
page = doc.load_page(page_num)
text += page.get_text()
return text
def save_feedback(question, answer,rating):
feedback_file = 'question_feedback.json'
if os.path.exists(feedback_file):
with open(feedback_file, 'r') as f:
feedback_data = json.load(f)
else:
feedback_data = []
tpl = {
'question' : question,
'answer' : answer,
'rating' : rating,
}
# feedback_data[question] = rating
feedback_data.append(tpl)
with open(feedback_file, 'w') as f:
json.dump(feedback_data, f)
# Function to clean text
def clean_text(text):
text = re.sub(r"[^\x00-\x7F]", " ", text)
return text
# Function to create text chunks
def segment_text(text, max_segment_length=1000):
"""Segment the text into smaller chunks."""
sentences = sent_tokenize(text)
segments = []
current_segment = ""
for sentence in sentences:
if len(current_segment) + len(sentence) <= max_segment_length:
current_segment += sentence + " "
else:
segments.append(current_segment.strip())
current_segment = sentence + " "
if current_segment:
segments.append(current_segment.strip())
print(f"\n\nSegement Chunks: {segments}\n\n")
return segments
# Function to extract keywords using combined techniques
def extract_keywords(text, extract_all):
doc = nlp(text)
spacy_keywords = set([ent.text for ent in doc.ents])
spacy_entities = spacy_keywords
print(f"\n\nSpacy Entities: {spacy_entities} \n\n")
# Use Only Spacy Entities
if extract_all is False:
return list(spacy_entities)
# Use RAKE
rake = Rake()
rake.extract_keywords_from_text(text)
rake_keywords = set(rake.get_ranked_phrases())
print(f"\n\nRake Keywords: {rake_keywords} \n\n")
# Use spaCy for NER and POS tagging
spacy_keywords.update([token.text for token in doc if token.pos_ in ["NOUN", "PROPN", "VERB", "ADJ"]])
print(f"\n\nSpacy Keywords: {spacy_keywords} \n\n")
# Use TF-IDF
vectorizer = TfidfVectorizer(stop_words='english')
X = vectorizer.fit_transform([text])
tfidf_keywords = set(vectorizer.get_feature_names_out())
print(f"\n\nTFIDF Entities: {tfidf_keywords} \n\n")
# Combine all keywords
combined_keywords = rake_keywords.union(spacy_keywords).union(tfidf_keywords)
return list(combined_keywords)
def get_similar_words_sense2vec(word, n=3):
# Try to find the word with its most likely part-of-speech
word_with_pos = word + "|NOUN"
if word_with_pos in s2v:
similar_words = s2v.most_similar(word_with_pos, n=n)
return [word.split("|")[0] for word, _ in similar_words]
# If not found, try without POS
if word in s2v:
similar_words = s2v.most_similar(word, n=n)
return [word.split("|")[0] for word, _ in similar_words]
return []
def get_synonyms(word, n=3):
synonyms = []
for syn in wordnet.synsets(word):
for lemma in syn.lemmas():
if lemma.name() != word and lemma.name() not in synonyms:
synonyms.append(lemma.name())
if len(synonyms) == n:
return synonyms
return synonyms
def generate_options(answer, context, n=3):
options = [answer]
# Add contextually relevant words using a pre-trained model
context_embedding = context_model.encode(context)
answer_embedding = context_model.encode(answer)
context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()]
# Compute similarity scores and sort context words
similarity_scores = [util.pytorch_cos_sim(context_model.encode(word), answer_embedding).item() for word in context_words]
sorted_context_words = [word for _, word in sorted(zip(similarity_scores, context_words), reverse=True)]
options.extend(sorted_context_words[:n])
# Try to get similar words based on sense2vec
similar_words = get_similar_words_sense2vec(answer, n)
options.extend(similar_words)
# If we don't have enough options, try synonyms
if len(options) < n + 1:
synonyms = get_synonyms(answer, n - len(options) + 1)
options.extend(synonyms)
# If we still don't have enough options, extract other entities from the context
if len(options) < n + 1:
doc = nlp(context)
entities = [ent.text for ent in doc.ents if ent.text.lower() != answer.lower()]
options.extend(entities[:n - len(options) + 1])
# If we still need more options, add some random words from the context
if len(options) < n + 1:
context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()]
options.extend(random.sample(context_words, min(n - len(options) + 1, len(context_words))))
print(f"\n\nAll Possible Options: {options}\n\n")
# Ensure we have the correct number of unique options
options = list(dict.fromkeys(options))[:n+1]
# Shuffle the options
random.shuffle(options)
return options
# Function to map keywords to sentences with customizable context window size
def map_keywords_to_sentences(text, keywords, context_window_size):
sentences = sent_tokenize(text)
keyword_sentence_mapping = {}
print(f"\n\nSentences: {sentences}\n\n")
for keyword in keywords:
for i, sentence in enumerate(sentences):
if keyword in sentence:
# Combine current sentence with surrounding sentences for context
start = max(0, i - context_window_size)
end = min(len(sentences), i + context_window_size + 1)
context = ' '.join(sentences[start:end])
if keyword not in keyword_sentence_mapping:
keyword_sentence_mapping[keyword] = context
else:
keyword_sentence_mapping[keyword] += ' ' + context
return keyword_sentence_mapping
# Function to perform entity linking using Wikipedia API
@lru_cache(maxsize=128)
def entity_linking(keyword):
page = wiki_wiki.page(keyword)
if page.exists():
return page.fullurl
return None
# Function to generate questions using beam search
def generate_question(context, answer, num_beams):
input_text = f"<context> {context} <answer> {answer}"
input_ids = tokenizer.encode(input_text, return_tensors='pt')
outputs = model.generate(input_ids, num_beams=num_beams, early_stopping=True)
question = tokenizer.decode(outputs[0], skip_special_tokens=True)
return question
# Function to export questions to CSV
def export_to_csv(data):
# df = pd.DataFrame(data, columns=["Context", "Answer", "Question", "Options"])
df = pd.DataFrame(data)
# csv = df.to_csv(index=False,encoding='utf-8')
csv = df.to_csv(index=False)
return csv
# Function to export questions to PDF
def export_to_pdf(data):
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
for item in data:
pdf.multi_cell(0, 10, f"Context: {item['context']}")
pdf.multi_cell(0, 10, f"Question: {item['question']}")
pdf.multi_cell(0, 10, f"Answer: {item['answer']}")
pdf.multi_cell(0, 10, f"Options: {', '.join(item['options'])}")
pdf.multi_cell(0, 10, f"Overall Score: {item['overall_score']:.2f}")
pdf.ln(10)
return pdf.output(dest='S').encode('latin-1')
def display_word_cloud(generated_questions):
word_frequency = {}
for question in generated_questions:
words = question.split()
for word in words:
word_frequency[word] = word_frequency.get(word, 0) + 1
wordcloud = WordCloud(width=800, height=400, background_color='white').generate_from_frequencies(word_frequency)
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
st.pyplot()
def assess_question_quality(context, question, answer):
# Assess relevance using cosine similarity
context_doc = nlp(context)
question_doc = nlp(question)
relevance_score = context_doc.similarity(question_doc)
# Assess complexity using token length (as a simple metric)
complexity_score = min(len(question_doc) / 20, 1) # Normalize to 0-1
# Assess Spelling correctness
misspelled = spell.unknown(question.split())
spelling_correctness = 1 - (len(misspelled) / len(question.split())) # Normalize to 0-1
# Calculate overall score (you can adjust weights as needed)
overall_score = (
0.4 * relevance_score +
0.4 * complexity_score +
0.2 * spelling_correctness
)
return overall_score, relevance_score, complexity_score, spelling_correctness
def main():
# Streamlit interface
st.title(":blue[Question Generator System]")
session_id = get_session_id()
state = initialize_state(session_id)
# Initialize session state
if 'generated_questions' not in st.session_state:
st.session_state.generated_questions = []
with st.sidebar:
st.subheader("Customization Options")
# Customization options
input_type = st.radio("Select Input Preference", ("Text Input","Upload PDF"))
num_beams = st.slider("Select number of beams for question generation", min_value=1, max_value=10, value=5)
context_window_size = st.slider("Select context window size (number of sentences before and after)", min_value=1, max_value=5, value=1)
num_questions = st.slider("Select number of questions to generate", min_value=1, max_value=1000, value=5)
with st.expander("Choose the Additional Elements to show"):
show_context = st.checkbox("Context",True)
show_answer = st.checkbox("Answer",True)
show_options = st.checkbox("Options",False)
show_entity_link = st.checkbox("Entity Link For Wikipedia",True)
show_qa_scores = st.checkbox("QA Score",False)
col1, col2 = st.columns(2)
with col1:
extract_all_keywords = st.toggle("Extract Max Keywords",value=False)
with col2:
enable_feedback_mode = st.toggle("Enable Feedback Mode",False)
text = None
if input_type == "Text Input":
text = st.text_area("Enter text here:", value="Joe Biden, the current US president is on a weak wicket going in for his reelection later this November against former President Donald Trump.")
elif input_type == "Upload PDF":
file = st.file_uploader("Upload PDF Files")
if file is not None:
text = get_pdf_text(file)
if text:
text = clean_text(text)
segments = segment_text(text)
generate_questions_button = st.button("Generate Questions")
if generate_questions_button and text:
state['generated_questions'] = []
# st.session_state.generated_questions = []
for text in segments:
keywords = extract_keywords(text, extract_all_keywords)
print(f"\n\nFinal Keywords in Main Function: {keywords}\n\n")
keyword_sentence_mapping = map_keywords_to_sentences(text, keywords, context_window_size)
for i, (keyword, context) in enumerate(keyword_sentence_mapping.items()):
if i >= num_questions:
break
question = generate_question(context, keyword, num_beams=num_beams)
options = generate_options(keyword,context)
overall_score, relevance_score, complexity_score, spelling_correctness = assess_question_quality(context,question,keyword)
if overall_score < 0.5:
continue
tpl = {
"question" : question,
"context" : context,
"answer" : keyword,
"options" : options,
"overall_score" : overall_score,
"relevance_score" : relevance_score,
"complexity_score" : complexity_score,
"spelling_correctness" : spelling_correctness,
}
# st.session_state.generated_questions.append(tpl)
state['generated_questions'].append(tpl)
set_state(session_id, 'generated_questions', state['generated_questions'])
# sort question based on their quality score
# st.session_state.generated_questions = sorted(st.session_state.generated_questions,key = lambda x: x['overall_score'], reverse=True)
state['generated_questions'] = sorted(state['generated_questions'],key = lambda x: x['overall_score'], reverse=True)
# Display generated questions
# if st.session_state.generated_questions:
if state['generated_questions']:
st.header("Generated Questions:",divider='blue')
for i, q in enumerate(st.session_state.generated_questions):
# with st.expander(f"Question {i+1}"):
st.subheader(body=f":orange[Q{i+1}:] {q['question']}")
if show_context is True:
st.write(f"**Context:** {q['context']}")
if show_answer is True:
st.write(f"**Answer:** {q['answer']}")
if show_options is True:
st.write(f"**Options:**")
for j, option in enumerate(q['options']):
st.write(f"{chr(65+j)}. {option}")
if show_entity_link is True:
linked_entity = entity_linking(q['answer'])
if linked_entity:
st.write(f"**Entity Link:** {linked_entity}")
if show_qa_scores is True:
m1,m2,m3,m4 = st.columns([1.7,1,1,1])
m1.metric("Overall Quality Score", value=f"{q['overall_score']:,.2f}")
m2.metric("Relevance Score", value=f"{q['relevance_score']:,.2f}")
m3.metric("Complexity Score", value=f"{q['complexity_score']:,.2f}")
m4.metric("Spelling Correctness", value=f"{q['spelling_correctness']:,.2f}")
# q['context'] = st.text_area(f"Edit Context {i+1}:", value=q['context'], key=f"context_{i}")
if enable_feedback_mode:
q['question'] = st.text_input(f"Edit Question {i+1}:", value=q['question'], key=f"question_{i}")
q['rating'] = st.selectbox(f"Rate this question (1-5)", options=[1, 2, 3, 4, 5], key=f"rating_{i}")
if st.button(f"Submit Feedback for Question {i+1}", key=f"submit_{i}"):
save_feedback(q['question'], q['answer'], q['rating'])
st.success(f"Feedback submitted for Question {i+1}")
st.write("---")
# Export buttons
# if st.session_state.generated_questions:
if state['generated_questions']:
with st.sidebar:
csv_data = export_to_csv(st.session_state.generated_questions)
st.download_button(label="Download CSV", data=csv_data, file_name='questions.csv', mime='text/csv')
pdf_data = export_to_pdf(st.session_state.generated_questions)
st.download_button(label="Download PDF", data=pdf_data, file_name='questions.pdf', mime='application/pdf')
# View Feedback Statistics
with st.expander("View Feedback Statistics"):
feedback_file = 'question_feedback.json'
if os.path.exists(feedback_file):
with open(feedback_file, 'r') as f:
feedback_data = json.load(f)
st.subheader("Feedback Statistics")
# Calculate average rating
ratings = [feedback['rating'] for feedback in feedback_data]
avg_rating = sum(ratings) / len(ratings) if ratings else 0
st.write(f"Average Question Rating: {avg_rating:.2f}")
# Show distribution of ratings
rating_counts = {i: ratings.count(i) for i in range(1, 6)}
st.bar_chart(rating_counts)
# Show some highly rated questions
st.subheader("Highly Rated Questions")
sorted_feedback = sorted(feedback_data, key=lambda x: x['rating'], reverse=True)
top_questions = sorted_feedback[:5]
for feedback in top_questions:
st.write(f"Question: {feedback['question']}")
st.write(f"Answer: {feedback['answer']}")
st.write(f"Rating: {feedback['rating']}")
st.write("---")
else:
st.write("No feedback data available yet.")
print("********************************************************************************")
if __name__ == '__main__':
main()