Spaces:
Sleeping
Sleeping
File size: 6,462 Bytes
31ef0bb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
import os
import fitz # PyMuPDF
import tensorflow as tf
from sentence_transformers import SentenceTransformer
import numpy as np
import faiss
import json
import re
# This folder should contain all the pdf files which we need to work on . Below given is just an example
pdf_folder = '/Users/shivangsinha/Downloads/personalProject'
pdf_text_data = {}
embeddings = []
metadata = []
# Initialize the sentence transformer model
model = SentenceTransformer('all-MiniLM-L6-v2')
#model = SentenceTransformer('paraphrase-MiniLM-L6-v2') - Also tried with other model but seems the current one is working better.
# converting tensor to string so that to store it in json format.
def tensor_to_string(tensor):
return tensor.numpy().decode("utf-8") # Assuming utf-8 encoding
# extract text based on page number so that it is more relevant for search.
def extract_text_from_pdf_with_page_numbers(pdf_path):
doc = fitz.open(pdf_path)
text_pages = []
for page_num in range(len(doc)):
page = doc.load_page(page_num)
text = page.get_text()
text_pages.append((page_num + 1, text)) # Page numbers are 1-based in fitz
return text_pages
# Making sure inout data is not coming from table of content part and also preprocess all the text which are irrevant for the search.
def custom_standardization(input_data):
# If index pattern is seems to be part of table of content then simply ignore it.
index_pattern = re.compile(r'\.{3,}')
if bool(index_pattern.search(input_data.numpy().decode('utf-8'))):
return ""
# Remove URLs
stripped_urls = tf.strings.regex_replace(input_data, r"https?://\S+|www\.\S+", "")
# Remove email addresses
stripped_emails = tf.strings.regex_replace(stripped_urls, r"\S+@\S+", "")
# Remove text in angular brackets (usually HTML tags)
stripped_brackets = tf.strings.regex_replace(stripped_emails, r"<.*?>", "")
# Remove any square brackets and leave the text within square brackets
stripped_square_brackets = tf.strings.regex_replace(stripped_brackets, r"\[|\]", "")
# Remove alphanumeric characters with digits
stripped_digits = tf.strings.regex_replace(stripped_square_brackets, r"\w*\d\w*", "")
# Remove non-alphabet characters
stripped_non_alpha = tf.strings.regex_replace(stripped_digits, r"[^a-zA-Z\s]", "")
# Replace multiple whitespaces with a single whitespace
standardized_text = tf.strings.regex_replace(stripped_non_alpha, r"\s+", " ")
return standardized_text.numpy().decode('utf-8')
# For the time being I am using the pattern of question and answer. I am splitting up text into paragraphs which ends with ? mark
def split_into_paragraphs(text):
pattern = r'(?<=\n)(?=\d+\.)'
# Split text using the pattern
paragraphs = re.split(pattern, text)
# Remove leading/trailing whitespace from each paragraph and filter out empty paragraphs
paragraphs = [paragraph.strip() for paragraph in paragraphs if paragraph.strip()]
return paragraphs
# This part is for storing the vector of a paragraph in a required format
def text_to_vectors(paragraphs):
vectors = model.encode(paragraphs)
return vectors
# This split is used to Answer the query or simply show the relevant text from the book.
def split_into_qa(text):
# Find the last occurrence of a question mark
index_pattern = re.compile(r'\.{3,}')
# Split the text at each question mark followed by a newline or space
match = re.search(r'(.*\?.*?)\n', text, re.DOTALL)
# If a match is found, split the text accordingly
if match:
question = match.group(1).strip() # The part before the last question mark
answer = text[match.end():].strip() # The part after the last question mark
# Filter out index-like entries in both question and answer
if index_pattern.search(question):
question = "" # Ignore this as it looks like an index entry
if index_pattern.search(answer):
answer = "" # Ignore this as it looks like an index entry
else:
question = text.strip() # No question mark found, consider the entire text as the question
answer = "" # No answer part
return question, answer
# storing vector to use it later while querying
def store_vectors(paragraphs, vectors, metadata, filename, page_num):
for i, (paragraph, vector) in enumerate(zip(paragraphs, vectors)):
original_text = paragraph
question,answer = split_into_qa(original_text)
original_text = paragraph[:500] # Store the first 500 characters of the original text
standardized_text = custom_standardization(tf.constant(paragraph))
vector = model.encode(standardized_text).tolist() # Recompute vector for standardized text
metadata.append({
"index": f'paragraph-{i}',
"filename": filename,
"page_num": page_num,
"standardized_text": standardized_text,
"question_text":question,
"answerable_text":answer
})
embeddings.append(vector)
for filename in os.listdir(pdf_folder):
if filename.endswith('.pdf'):
pdf_path = os.path.join(pdf_folder, filename)
text_pages = extract_text_from_pdf_with_page_numbers(pdf_path)
for page_num, text in text_pages:
paragraphs = split_into_paragraphs(text)
vectors = text_to_vectors(paragraphs)
store_vectors(paragraphs, vectors, metadata, filename, page_num)
pdf_text_data[filename] = text_pages
# Save FAISS index and metadata to JSON
index_path = 'vector_indexNLP.faiss'
metadata_path = 'metadataNLP.json'
# Convert embeddings to numpy array for FAISS
embeddings_array = np.array(embeddings, dtype='float32')
# Initialize FAISS index
dimension = embeddings_array.shape[1] # Dimension of the embeddings
index = faiss.IndexFlatL2(dimension)
# Add embeddings in batches to avoid memory issues. I faced some issue while adding index
batch_size = 1000 # Adjust batch size based on available memory
for i in range(0, len(embeddings), batch_size):
batch_embeddings = embeddings_array[i:i+batch_size]
index.add(batch_embeddings)
# Save the FAISS index
faiss.write_index(index, index_path)
# Save metadata
with open(metadata_path, 'w') as f:
json.dump(metadata, f)
print(f"FAISS index saved to: {index_path}")
print(f"Metadata saved to: {metadata_path}")
|