Spaces:
Sleeping
Sleeping
import os | |
import fitz # PyMuPDF | |
import tensorflow as tf | |
from sentence_transformers import SentenceTransformer | |
import numpy as np | |
import faiss | |
import json | |
import re | |
# This folder should contain all the pdf files which we need to work on . Below given is just an example | |
pdf_folder = '/Users/shivangsinha/Downloads/personalProject' | |
pdf_text_data = {} | |
embeddings = [] | |
metadata = [] | |
# Initialize the sentence transformer model | |
model = SentenceTransformer('all-MiniLM-L6-v2') | |
#model = SentenceTransformer('paraphrase-MiniLM-L6-v2') - Also tried with other model but seems the current one is working better. | |
# converting tensor to string so that to store it in json format. | |
def tensor_to_string(tensor): | |
return tensor.numpy().decode("utf-8") # Assuming utf-8 encoding | |
# extract text based on page number so that it is more relevant for search. | |
def extract_text_from_pdf_with_page_numbers(pdf_path): | |
doc = fitz.open(pdf_path) | |
text_pages = [] | |
for page_num in range(len(doc)): | |
page = doc.load_page(page_num) | |
text = page.get_text() | |
text_pages.append((page_num + 1, text)) # Page numbers are 1-based in fitz | |
return text_pages | |
# Making sure inout data is not coming from table of content part and also preprocess all the text which are irrevant for the search. | |
def custom_standardization(input_data): | |
# If index pattern is seems to be part of table of content then simply ignore it. | |
index_pattern = re.compile(r'\.{3,}') | |
if bool(index_pattern.search(input_data.numpy().decode('utf-8'))): | |
return "" | |
# Remove URLs | |
stripped_urls = tf.strings.regex_replace(input_data, r"https?://\S+|www\.\S+", "") | |
# Remove email addresses | |
stripped_emails = tf.strings.regex_replace(stripped_urls, r"\S+@\S+", "") | |
# Remove text in angular brackets (usually HTML tags) | |
stripped_brackets = tf.strings.regex_replace(stripped_emails, r"<.*?>", "") | |
# Remove any square brackets and leave the text within square brackets | |
stripped_square_brackets = tf.strings.regex_replace(stripped_brackets, r"\[|\]", "") | |
# Remove alphanumeric characters with digits | |
stripped_digits = tf.strings.regex_replace(stripped_square_brackets, r"\w*\d\w*", "") | |
# Remove non-alphabet characters | |
stripped_non_alpha = tf.strings.regex_replace(stripped_digits, r"[^a-zA-Z\s]", "") | |
# Replace multiple whitespaces with a single whitespace | |
standardized_text = tf.strings.regex_replace(stripped_non_alpha, r"\s+", " ") | |
return standardized_text.numpy().decode('utf-8') | |
# For the time being I am using the pattern of question and answer. I am splitting up text into paragraphs which ends with ? mark | |
def split_into_paragraphs(text): | |
pattern = r'(?<=\n)(?=\d+\.)' | |
# Split text using the pattern | |
paragraphs = re.split(pattern, text) | |
# Remove leading/trailing whitespace from each paragraph and filter out empty paragraphs | |
paragraphs = [paragraph.strip() for paragraph in paragraphs if paragraph.strip()] | |
return paragraphs | |
# This part is for storing the vector of a paragraph in a required format | |
def text_to_vectors(paragraphs): | |
vectors = model.encode(paragraphs) | |
return vectors | |
# This split is used to Answer the query or simply show the relevant text from the book. | |
def split_into_qa(text): | |
# Find the last occurrence of a question mark | |
index_pattern = re.compile(r'\.{3,}') | |
# Split the text at each question mark followed by a newline or space | |
match = re.search(r'(.*\?.*?)\n', text, re.DOTALL) | |
# If a match is found, split the text accordingly | |
if match: | |
question = match.group(1).strip() # The part before the last question mark | |
answer = text[match.end():].strip() # The part after the last question mark | |
# Filter out index-like entries in both question and answer | |
if index_pattern.search(question): | |
question = "" # Ignore this as it looks like an index entry | |
if index_pattern.search(answer): | |
answer = "" # Ignore this as it looks like an index entry | |
else: | |
question = text.strip() # No question mark found, consider the entire text as the question | |
answer = "" # No answer part | |
return question, answer | |
# storing vector to use it later while querying | |
def store_vectors(paragraphs, vectors, metadata, filename, page_num): | |
for i, (paragraph, vector) in enumerate(zip(paragraphs, vectors)): | |
original_text = paragraph | |
question,answer = split_into_qa(original_text) | |
original_text = paragraph[:500] # Store the first 500 characters of the original text | |
standardized_text = custom_standardization(tf.constant(paragraph)) | |
vector = model.encode(standardized_text).tolist() # Recompute vector for standardized text | |
metadata.append({ | |
"index": f'paragraph-{i}', | |
"filename": filename, | |
"page_num": page_num, | |
"standardized_text": standardized_text, | |
"question_text":question, | |
"answerable_text":answer | |
}) | |
embeddings.append(vector) | |
for filename in os.listdir(pdf_folder): | |
if filename.endswith('.pdf'): | |
pdf_path = os.path.join(pdf_folder, filename) | |
text_pages = extract_text_from_pdf_with_page_numbers(pdf_path) | |
for page_num, text in text_pages: | |
paragraphs = split_into_paragraphs(text) | |
vectors = text_to_vectors(paragraphs) | |
store_vectors(paragraphs, vectors, metadata, filename, page_num) | |
pdf_text_data[filename] = text_pages | |
# Save FAISS index and metadata to JSON | |
index_path = 'vector_indexNLP.faiss' | |
metadata_path = 'metadataNLP.json' | |
# Convert embeddings to numpy array for FAISS | |
embeddings_array = np.array(embeddings, dtype='float32') | |
# Initialize FAISS index | |
dimension = embeddings_array.shape[1] # Dimension of the embeddings | |
index = faiss.IndexFlatL2(dimension) | |
# Add embeddings in batches to avoid memory issues. I faced some issue while adding index | |
batch_size = 1000 # Adjust batch size based on available memory | |
for i in range(0, len(embeddings), batch_size): | |
batch_embeddings = embeddings_array[i:i+batch_size] | |
index.add(batch_embeddings) | |
# Save the FAISS index | |
faiss.write_index(index, index_path) | |
# Save metadata | |
with open(metadata_path, 'w') as f: | |
json.dump(metadata, f) | |
print(f"FAISS index saved to: {index_path}") | |
print(f"Metadata saved to: {metadata_path}") | |