import streamlit as st from transformers import T5ForConditionalGeneration, T5Tokenizer import spacy import nltk from sklearn.feature_extraction.text import TfidfVectorizer from rake_nltk import Rake import pandas as pd from fpdf import FPDF import wikipediaapi from functools import lru_cache nltk.download('punkt') nltk.download('stopwords') nltk.download('brown') from nltk.tokenize import sent_tokenize nltk.download('wordnet') from nltk.corpus import wordnet import random import sense2vec from wordcloud import WordCloud import matplotlib.pyplot as plt import json import os from sentence_transformers import SentenceTransformer, util import textstat from spellchecker import SpellChecker from transformers import pipeline import re import pymupdf import uuid print("***************************************************************") st.set_page_config( page_title="Question Generator", initial_sidebar_state="auto", menu_items={ "About" : "#Hi this our project." } ) # st.set_option(deprecation.showPyplotGlobalUse=False) # st.set_option('base','dark') # Initialize Wikipedia API with a user agent user_agent = 'QGen/1.0 (channingfisher7@gmail.com)' wiki_wiki = wikipediaapi.Wikipedia(user_agent= user_agent,language='en') def get_session_id(): if 'session_id' not in st.session_state: st.session_state.session_id = str(uuid.uuid4()) return st.session_state.session_id def initialize_state(session_id): if 'session_states' not in st.session_state: st.session_state.session_states = {} if session_id not in st.session_state.session_states: st.session_state.session_states[session_id] = { 'generated_questions': [], # add other state variables as needed } return st.session_state.session_states[session_id] def get_state(session_id): return st.session_state.session_states[session_id] def set_state(session_id, key, value): st.session_state.session_states[session_id][key] = value @st.cache_resource def load_model(): model_name = "DevBM/t5-small-squad" model = T5ForConditionalGeneration.from_pretrained(model_name) tokenizer = T5Tokenizer.from_pretrained(model_name) return model, tokenizer # Load Spacy Model @st.cache_resource def load_nlp_models(): nlp = spacy.load("en_core_web_md") s2v = sense2vec.Sense2Vec().from_disk('s2v_old') return nlp, s2v # Load Quality Assurance Models @st.cache_resource def load_qa_models(): # Initialize BERT model for sentence similarity similarity_model = SentenceTransformer('all-MiniLM-L6-v2') spell = SpellChecker() return similarity_model, spell nlp, s2v = load_nlp_models() model, tokenizer = load_model() similarity_model, spell = load_qa_models() context_model = similarity_model def get_pdf_text(pdf_file): doc = pymupdf.open(stream=pdf_file.read(), filetype="pdf") text = "" for page_num in range(doc.page_count): page = doc.load_page(page_num) text += page.get_text() return text def save_feedback(question, answer,rating): feedback_file = 'question_feedback.json' if os.path.exists(feedback_file): with open(feedback_file, 'r') as f: feedback_data = json.load(f) else: feedback_data = [] tpl = { 'question' : question, 'answer' : answer, 'rating' : rating, } # feedback_data[question] = rating feedback_data.append(tpl) with open(feedback_file, 'w') as f: json.dump(feedback_data, f) # Function to clean text def clean_text(text): text = re.sub(r"[^\x00-\x7F]", " ", text) return text # Function to create text chunks def segment_text(text, max_segment_length=1000): """Segment the text into smaller chunks.""" sentences = sent_tokenize(text) segments = [] current_segment = "" for sentence in sentences: if len(current_segment) + len(sentence) <= max_segment_length: current_segment += sentence + " " else: segments.append(current_segment.strip()) current_segment = sentence + " " if current_segment: segments.append(current_segment.strip()) print(f"\n\nSegement Chunks: {segments}\n\n") return segments # Function to extract keywords using combined techniques def extract_keywords(text, extract_all): doc = nlp(text) spacy_keywords = set([ent.text for ent in doc.ents]) spacy_entities = spacy_keywords print(f"\n\nSpacy Entities: {spacy_entities} \n\n") # Use Only Spacy Entities if extract_all is False: return list(spacy_entities) # Use RAKE rake = Rake() rake.extract_keywords_from_text(text) rake_keywords = set(rake.get_ranked_phrases()) print(f"\n\nRake Keywords: {rake_keywords} \n\n") # Use spaCy for NER and POS tagging spacy_keywords.update([token.text for token in doc if token.pos_ in ["NOUN", "PROPN", "VERB", "ADJ"]]) print(f"\n\nSpacy Keywords: {spacy_keywords} \n\n") # Use TF-IDF vectorizer = TfidfVectorizer(stop_words='english') X = vectorizer.fit_transform([text]) tfidf_keywords = set(vectorizer.get_feature_names_out()) print(f"\n\nTFIDF Entities: {tfidf_keywords} \n\n") # Combine all keywords combined_keywords = rake_keywords.union(spacy_keywords).union(tfidf_keywords) return list(combined_keywords) def get_similar_words_sense2vec(word, n=3): # Try to find the word with its most likely part-of-speech word_with_pos = word + "|NOUN" if word_with_pos in s2v: similar_words = s2v.most_similar(word_with_pos, n=n) return [word.split("|")[0] for word, _ in similar_words] # If not found, try without POS if word in s2v: similar_words = s2v.most_similar(word, n=n) return [word.split("|")[0] for word, _ in similar_words] return [] def get_synonyms(word, n=3): synonyms = [] for syn in wordnet.synsets(word): for lemma in syn.lemmas(): if lemma.name() != word and lemma.name() not in synonyms: synonyms.append(lemma.name()) if len(synonyms) == n: return synonyms return synonyms def generate_options(answer, context, n=3): options = [answer] # Add contextually relevant words using a pre-trained model context_embedding = context_model.encode(context) answer_embedding = context_model.encode(answer) context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()] # Compute similarity scores and sort context words similarity_scores = [util.pytorch_cos_sim(context_model.encode(word), answer_embedding).item() for word in context_words] sorted_context_words = [word for _, word in sorted(zip(similarity_scores, context_words), reverse=True)] options.extend(sorted_context_words[:n]) # Try to get similar words based on sense2vec similar_words = get_similar_words_sense2vec(answer, n) options.extend(similar_words) # If we don't have enough options, try synonyms if len(options) < n + 1: synonyms = get_synonyms(answer, n - len(options) + 1) options.extend(synonyms) # If we still don't have enough options, extract other entities from the context if len(options) < n + 1: doc = nlp(context) entities = [ent.text for ent in doc.ents if ent.text.lower() != answer.lower()] options.extend(entities[:n - len(options) + 1]) # If we still need more options, add some random words from the context if len(options) < n + 1: context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()] options.extend(random.sample(context_words, min(n - len(options) + 1, len(context_words)))) print(f"\n\nAll Possible Options: {options}\n\n") # Ensure we have the correct number of unique options options = list(dict.fromkeys(options))[:n+1] # Shuffle the options random.shuffle(options) return options # Function to map keywords to sentences with customizable context window size def map_keywords_to_sentences(text, keywords, context_window_size): sentences = sent_tokenize(text) keyword_sentence_mapping = {} print(f"\n\nSentences: {sentences}\n\n") for keyword in keywords: for i, sentence in enumerate(sentences): if keyword in sentence: # Combine current sentence with surrounding sentences for context start = max(0, i - context_window_size) end = min(len(sentences), i + context_window_size + 1) context = ' '.join(sentences[start:end]) if keyword not in keyword_sentence_mapping: keyword_sentence_mapping[keyword] = context else: keyword_sentence_mapping[keyword] += ' ' + context return keyword_sentence_mapping # Function to perform entity linking using Wikipedia API @lru_cache(maxsize=128) def entity_linking(keyword): page = wiki_wiki.page(keyword) if page.exists(): return page.fullurl return None # Function to generate questions using beam search def generate_question(context, answer, num_beams): input_text = f" {context} {answer}" input_ids = tokenizer.encode(input_text, return_tensors='pt') outputs = model.generate(input_ids, num_beams=num_beams, early_stopping=True) question = tokenizer.decode(outputs[0], skip_special_tokens=True) return question # Function to export questions to CSV def export_to_csv(data): # df = pd.DataFrame(data, columns=["Context", "Answer", "Question", "Options"]) df = pd.DataFrame(data) # csv = df.to_csv(index=False,encoding='utf-8') csv = df.to_csv(index=False) return csv # Function to export questions to PDF def export_to_pdf(data): pdf = FPDF() pdf.add_page() pdf.set_font("Arial", size=12) for item in data: pdf.multi_cell(0, 10, f"Context: {item['context']}") pdf.multi_cell(0, 10, f"Question: {item['question']}") pdf.multi_cell(0, 10, f"Answer: {item['answer']}") pdf.multi_cell(0, 10, f"Options: {', '.join(item['options'])}") pdf.multi_cell(0, 10, f"Overall Score: {item['overall_score']:.2f}") pdf.ln(10) return pdf.output(dest='S').encode('latin-1') def display_word_cloud(generated_questions): word_frequency = {} for question in generated_questions: words = question.split() for word in words: word_frequency[word] = word_frequency.get(word, 0) + 1 wordcloud = WordCloud(width=800, height=400, background_color='white').generate_from_frequencies(word_frequency) plt.figure(figsize=(10, 5)) plt.imshow(wordcloud, interpolation='bilinear') plt.axis('off') st.pyplot() def assess_question_quality(context, question, answer): # Assess relevance using cosine similarity context_doc = nlp(context) question_doc = nlp(question) relevance_score = context_doc.similarity(question_doc) # Assess complexity using token length (as a simple metric) complexity_score = min(len(question_doc) / 20, 1) # Normalize to 0-1 # Assess Spelling correctness misspelled = spell.unknown(question.split()) spelling_correctness = 1 - (len(misspelled) / len(question.split())) # Normalize to 0-1 # Calculate overall score (you can adjust weights as needed) overall_score = ( 0.4 * relevance_score + 0.4 * complexity_score + 0.2 * spelling_correctness ) return overall_score, relevance_score, complexity_score, spelling_correctness def main(): # Streamlit interface st.title(":blue[Question Generator System]") session_id = get_session_id() state = initialize_state(session_id) with st.sidebar: st.subheader("Customization Options") # Customization options input_type = st.radio("Select Input Preference", ("Text Input","Upload PDF")) num_beams = st.slider("Select number of beams for question generation", min_value=1, max_value=10, value=5) context_window_size = st.slider("Select context window size (number of sentences before and after)", min_value=1, max_value=5, value=1) num_questions = st.slider("Select number of questions to generate", min_value=1, max_value=1000, value=5) with st.expander("Choose the Additional Elements to show"): show_context = st.checkbox("Context",True) show_answer = st.checkbox("Answer",True) show_options = st.checkbox("Options",False) show_entity_link = st.checkbox("Entity Link For Wikipedia",True) show_qa_scores = st.checkbox("QA Score",False) col1, col2 = st.columns(2) with col1: extract_all_keywords = st.toggle("Extract Max Keywords",value=False) with col2: enable_feedback_mode = st.toggle("Enable Feedback Mode",False) # set_state(session_id, 'generated_questions', state['generated_questions']) text = None if input_type == "Text Input": text = st.text_area("Enter text here:", value="Joe Biden, the current US president is on a weak wicket going in for his reelection later this November against former President Donald Trump.") elif input_type == "Upload PDF": file = st.file_uploader("Upload PDF Files") if file is not None: text = get_pdf_text(file) if text: text = clean_text(text) segments = segment_text(text) generate_questions_button = st.button("Generate Questions") q_count = 0 if generate_questions_button: state['generated_questions'] = [] # st.session_state.generated_questions = [] for text in segments: keywords = extract_keywords(text, extract_all_keywords) print(f"\n\nFinal Keywords in Main Function: {keywords}\n\n") keyword_sentence_mapping = map_keywords_to_sentences(text, keywords, context_window_size) for i, (keyword, context) in enumerate(keyword_sentence_mapping.items()): if i >= num_questions: break if q_count>num_questions: break question = generate_question(context, keyword, num_beams=num_beams) options = generate_options(keyword,context) overall_score, relevance_score, complexity_score, spelling_correctness = assess_question_quality(context,question,keyword) if overall_score < 0.5: continue tpl = { "question" : question, "context" : context, "answer" : keyword, "options" : options, "overall_score" : overall_score, "relevance_score" : relevance_score, "complexity_score" : complexity_score, "spelling_correctness" : spelling_correctness, } print("\n\n",tpl,"\n\n") # st.session_state.generated_questions.append(tpl) state['generated_questions'].append(tpl) q_count += 1 print("\n\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n\n") data = get_state(session_id) print(data) set_state(session_id, 'generated_questions', state['generated_questions']) a = get_state(session_id) # sort question based on their quality score state['generated_questions'] = sorted(state['generated_questions'],key = lambda x: x['overall_score'], reverse=True) # Display generated questions if state['generated_questions']: st.header("Generated Questions:",divider='blue') for i, q in enumerate(state['generated_questions']): st.subheader(body=f":orange[Q{i+1}:] {q['question']}") if show_context is True: st.write(f"**Context:** {q['context']}") if show_answer is True: st.write(f"**Answer:** {q['answer']}") if show_options is True: st.write(f"**Options:**") for j, option in enumerate(q['options']): st.write(f"{chr(65+j)}. {option}") if show_entity_link is True: linked_entity = entity_linking(q['answer']) if linked_entity: st.write(f"**Entity Link:** {linked_entity}") if show_qa_scores is True: m1,m2,m3,m4 = st.columns([1.7,1,1,1]) m1.metric("Overall Quality Score", value=f"{q['overall_score']:,.2f}") m2.metric("Relevance Score", value=f"{q['relevance_score']:,.2f}") m3.metric("Complexity Score", value=f"{q['complexity_score']:,.2f}") m4.metric("Spelling Correctness", value=f"{q['spelling_correctness']:,.2f}") # q['context'] = st.text_area(f"Edit Context {i+1}:", value=q['context'], key=f"context_{i}") if enable_feedback_mode: q['question'] = st.text_input(f"Edit Question {i+1}:", value=q['question'], key=f"question_{i}") q['rating'] = st.selectbox(f"Rate this question (1-5)", options=[1, 2, 3, 4, 5], key=f"rating_{i}") if st.button(f"Submit Feedback for Question {i+1}", key=f"submit_{i}"): save_feedback(q['question'], q['answer'], q['rating']) st.success(f"Feedback submitted for Question {i+1}") st.write("---") # Export buttons # if st.session_state.generated_questions: if state['generated_questions']: with st.sidebar: csv_data = export_to_csv(state['generated_questions']) st.download_button(label="Download CSV", data=csv_data, file_name='questions.csv', mime='text/csv') pdf_data = export_to_pdf(state['generated_questions']) st.download_button(label="Download PDF", data=pdf_data, file_name='questions.pdf', mime='application/pdf') with st.expander("View Visualizations"): questions = [tpl['question'] for tpl in state['generated_questions']] overall_scores = [tpl['overall_score'] for tpl in state['generated_questions']] st.subheader('WordCloud of Questions',divider='rainbow') display_word_cloud(questions) st.subheader('Overall Scores',divider='violet') overall_scores = pd.DataFrame(overall_scores,columns=['Overall Scores']) st.line_chart(overall_scores) # View Feedback Statistics with st.expander("View Feedback Statistics"): feedback_file = 'question_feedback.json' if os.path.exists(feedback_file): with open(feedback_file, 'r') as f: feedback_data = json.load(f) st.subheader("Feedback Statistics") # Calculate average rating ratings = [feedback['rating'] for feedback in feedback_data] avg_rating = sum(ratings) / len(ratings) if ratings else 0 st.write(f"Average Question Rating: {avg_rating:.2f}") # Show distribution of ratings rating_counts = {i: ratings.count(i) for i in range(1, 6)} st.bar_chart(rating_counts) # Show some highly rated questions st.subheader("Highly Rated Questions") sorted_feedback = sorted(feedback_data, key=lambda x: x['rating'], reverse=True) top_questions = sorted_feedback[:5] for feedback in top_questions: st.write(f"Question: {feedback['question']}") st.write(f"Answer: {feedback['answer']}") st.write(f"Rating: {feedback['rating']}") st.write("---") else: st.write("No feedback data available yet.") print("********************************************************************************") if __name__ == '__main__': main()