multi-meeting-QnA / utilities /
tykiww's picture
Update utilities/
9bf679f verified
history blame
8.83 kB
# Imports for Transcript Loader
import os
import webvtt
import re
from datetime import datetime
from llama_index import Document
# Imports for Document Embedder
import gc
import re
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from pinecone.grpc import PineconeGRPC
from pinecone import ServerlessSpec
from llama_index.vector_stores import PineconeVectorStore
from llama_index.node_parser import SemanticSplitterNodeParser
from llama_index.ingestion import IngestionPipeline
class VTTTranscriptLoader:
vtt file ingestion and cleaning. This was done because vtt files
are not recognized by llamaindex. The output should mirror that of
any document loader from llamaindex or langchain.
def __init__(self, file_path):
self.fp = file_path = None
def open_vtt(self, file_path, plaintext=True):
"""Read VTT file."""
if plaintext:
with open(file_path, "r") as f:
data = f.readlines()
data =
return data
def extract_speaker_name(self, text):
"""Extracts the speaker name from a VTT caption."""
match ="<v (.*?)>", text)
if match:
return None
def extract_speaker_words(self, captions):
"""Extracts the speaker text from a VTT caption."""
return [caption.text for caption in captions]
def merge_speaker_words(self, words, speakers, split=True):
"""Joins speaker names with their words."""
# Extract speaker names
speaker_list = [self.extract_speaker_name(line) for line in speakers if self.extract_speaker_name(line)]
# Extract words
words_list = self.extract_speaker_words(words)
# Combine speaker names and words
combined_list = list(zip(speaker_list, words_list))
# Return the combined list as a single string if split is False
if not split:
combined_list = '\n'.join([f"{name}: '{text}'" for name, text in combined_list])
return combined_list, speaker_list
def get_metadata(self, speaker_list, file_path):
"""Generates metadata for the transcript."""
# Meeting length
time_format = "%H:%M:%S.%f"
sess = self.open_vtt(file_path, plaintext=False)
dt1 = datetime.strptime(sess[0].start, time_format)
dt2 = datetime.strptime(sess[-1].end, time_format)
minutes = (dt2 - dt1).seconds / 60
# Meeting date
match ="\d{4}[-_]\d{2}[-_]\d{2}", file_path)
if match:
date_str ='_', '-')
date_obj = datetime.strptime(date_str, "%Y-%m-%d").date()
date_obj = None
# Pull dictionary here
output = {
'title': file_path,
'duration': minutes,
'meeting_date': date_obj.strftime("%Y-%m-%d"),
'speakers': list(set(speaker_list)),
return output
def manual_document(self, output, metadata):
"""Create document manually"""
document = Document(text=output)
document.metadata = metadata
return document
def process_file(self, file_path):
"""Processes a single VTT file and returns the combined speaker names and words."""
# Get words as webvtt captions
words = self.open_vtt(file_path, plaintext=False)
# Get speaker lines as plaintext
speaker = self.open_vtt(file_path, plaintext=True)
# Combine speaker names and words
output, speaker_list = self.merge_speaker_words(words, speaker, split=False)
# Get session data as dictionary
metadata = self.get_metadata(speaker_list, file_path)
return self.manual_document(output, metadata)
def load(self):
"""Processes all VTT files in the given list of directories or files and returns a list of results."""
results = []
for path in self.fp:
if os.path.isdir(path):
for root, _, files in os.walk(path):
for file in files:
if file.endswith('.vtt'):
file_path = os.path.join(root, file)
transcript = self.process_file(file_path)
if path.endswith('.vtt'):
transcript = self.process_file(path)
return results
class DocumentEmbedder:
Takes a document and embeds it directly into a pinecone data store.
Process retrieves, cleans, embeds, and sends the documents to vector
Currently supports hugginface embeddings only. Gotta keep things cheap.
def __init__(self, api_keys, files, embedding, index_name):
# api keys
self.pinecone_api_key = api_keys['pinecone']
self.openai_api_key = api_keys['openai']
self.huggingface_api_key = api_keys['huggingface']
# pinecone
self.embedding = embedding
self.vector_db = index_name
# basic items
self.files = files
#self.interactive = interactive
def clean_text(self, content: str) -> str:
Remove unwanted characters and patterns in text input.
:param content: Text input.
:return: Cleaned version of original text input.
# Fix hyphenated words broken by newline
content = re.sub(r'(\w+)-\n(\w+)', r'\1\2', content)
# Remove specific unwanted patterns and characters
unwanted_patterns = [
"\\n", " β€”", "β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”", "β€”β€”β€”β€”β€”β€”β€”β€”β€”", "β€”β€”β€”β€”β€”",
r'\\u[\dA-Fa-f]{4}', r'\uf075', r'\uf0b7'
for pattern in unwanted_patterns:
content = re.sub(pattern, "", content)
# Fix improperly spaced hyphenated words and normalize whitespace
content = re.sub(r'(\w)\s*-\s*(\w)', r'\1-\2', content)
content = re.sub(r'\s+', ' ', content)
return content
def create_embedder(self):
"""Get the right embedding model"""
embedding = HuggingFaceEmbedding(model_name=self.embedding)
return embedding
def pinecone_pipeline(self, embedding):
"""Initialize pinecone connection and vectorstore"""
# connect
pc = PineconeGRPC(api_key=self.pinecone_api_key)
# Create your index if index does not exist
indexes = [ for i in pc.list_indexes()]
index_exists = any([self.vector_db in i for i in indexes])
if index_exists:
print("Index already exists")
print("Creating index")
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
# Initialize your index
pinecone_index = pc.Index(self.vector_db)
# Initialize VectorStore
vector_store = PineconeVectorStore(pinecone_index=pinecone_index)
# create pipeline (abstracts away the need to adaptively process and batch)
pipeline = IngestionPipeline(
# creating appropriate chunks and cutoffs (this needs to be worked on).
buffer_size=10, # 1 = each sentence is a node
return pipeline
def embed(self):
"""stringing process above to embed and upsert directly to pinecone"""
# read_file
print("reading files")
results = self.files
# Call clean function
print("cleaning files")
for d in range(len(results)):
results[d].text = self.clean_text(results[d].text)
# set up embedder
print("retrieving embedder")
embedder = self.create_embedder()
# set up pinecone pipeline
print("initializing pinecone db")
pipeline = self.pinecone_pipeline(embedder)
# run pinecone in batches (of 1) for memory preservation.
print("reading into pinecone db")
batchsize = 1
for i in range(0, len(results), batchsize):
batch =[i:i+batchsize])
print("completed batch %s" % ((i+batchsize)/batchsize))