Spaces:
Sleeping
Sleeping
# Imports for Transcript Loader | |
import os | |
import webvtt | |
import re | |
from datetime import datetime | |
from llama_index import Document | |
# Imports for Document Embedder | |
import gc | |
import re | |
from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
from pinecone.grpc import PineconeGRPC | |
from pinecone import ServerlessSpec | |
from llama_index.vector_stores import PineconeVectorStore | |
from llama_index.node_parser import SemanticSplitterNodeParser | |
from llama_index.ingestion import IngestionPipeline | |
class VTTTranscriptLoader: | |
""" | |
vtt file ingestion and cleaning. This was done because vtt files | |
are not recognized by llamaindex. The output should mirror that of | |
any document loader from llamaindex or langchain. | |
""" | |
def __init__(self, file_path): | |
self.fp = file_path | |
self.data = None | |
def open_vtt(self, file_path, plaintext=True): | |
"""Read VTT file.""" | |
if plaintext: | |
with open(file_path, "r") as f: | |
data = f.readlines() | |
else: | |
data = webvtt.read(file_path) | |
return data | |
def extract_speaker_name(self, text): | |
"""Extracts the speaker name from a VTT caption.""" | |
match = re.search(r"<v (.*?)>", text) | |
if match: | |
return match.group(1) | |
else: | |
return None | |
def extract_speaker_words(self, captions): | |
"""Extracts the speaker text from a VTT caption.""" | |
return [caption.text for caption in captions] | |
def merge_speaker_words(self, words, speakers, split=True): | |
"""Joins speaker names with their words.""" | |
# Extract speaker names | |
speaker_list = [self.extract_speaker_name(line) for line in speakers if self.extract_speaker_name(line)] | |
# Extract words | |
words_list = self.extract_speaker_words(words) | |
# Combine speaker names and words | |
combined_list = list(zip(speaker_list, words_list)) | |
# Return the combined list as a single string if split is False | |
if not split: | |
combined_list = '\n'.join([f"{name}: '{text}'" for name, text in combined_list]) | |
return combined_list, speaker_list | |
def get_metadata(self, speaker_list, file_path): | |
"""Generates metadata for the transcript.""" | |
# Meeting length | |
time_format = "%H:%M:%S.%f" | |
sess = self.open_vtt(file_path, plaintext=False) | |
dt1 = datetime.strptime(sess[0].start, time_format) | |
dt2 = datetime.strptime(sess[-1].end, time_format) | |
minutes = (dt2 - dt1).seconds / 60 | |
# Meeting date | |
match = re.search(r"\d{4}[-_]\d{2}[-_]\d{2}", file_path) | |
if match: | |
date_str = match.group().replace('_', '-') | |
date_obj = datetime.strptime(date_str, "%Y-%m-%d").date() | |
else: | |
date_obj = None | |
# Pull dictionary here | |
output = { | |
'title': file_path, | |
'duration': minutes, | |
'meeting_date': date_obj.strftime("%Y-%m-%d"), | |
'speakers': list(set(speaker_list)), | |
} | |
return output | |
def manual_document(self, output, metadata): | |
"""Create document manually""" | |
document = Document(text=output) | |
document.metadata = metadata | |
return document | |
def process_file(self, file_path): | |
"""Processes a single VTT file and returns the combined speaker names and words.""" | |
# Get words as webvtt captions | |
words = self.open_vtt(file_path, plaintext=False) | |
# Get speaker lines as plaintext | |
speaker = self.open_vtt(file_path, plaintext=True) | |
# Combine speaker names and words | |
output, speaker_list = self.merge_speaker_words(words, speaker, split=False) | |
# Get session data as dictionary | |
metadata = self.get_metadata(speaker_list, file_path) | |
return self.manual_document(output, metadata) | |
def load(self): | |
"""Processes all VTT files in the given list of directories or files and returns a list of results.""" | |
results = [] | |
for path in self.fp: | |
if os.path.isdir(path): | |
for root, _, files in os.walk(path): | |
for file in files: | |
if file.endswith('.vtt'): | |
file_path = os.path.join(root, file) | |
transcript = self.process_file(file_path) | |
results.append(transcript) | |
else: | |
if path.endswith('.vtt'): | |
transcript = self.process_file(path) | |
results.append(transcript) | |
return results | |
class DocumentEmbedder: | |
""" | |
Takes a document and embeds it directly into a pinecone data store. | |
Process retrieves, cleans, embeds, and sends the documents to vector | |
store. | |
Currently supports hugginface embeddings only. Gotta keep things cheap. | |
""" | |
def __init__(self, api_keys, files, embedding, index_name): | |
# api keys | |
self.pinecone_api_key = api_keys['pinecone'] | |
self.openai_api_key = api_keys['openai'] | |
self.huggingface_api_key = api_keys['huggingface'] | |
# pinecone | |
self.embedding = embedding | |
self.vector_db = index_name | |
# basic items | |
self.files = files | |
#self.interactive = interactive | |
def clean_text(self, content: str) -> str: | |
""" | |
Remove unwanted characters and patterns in text input. | |
:param content: Text input. | |
:return: Cleaned version of original text input. | |
""" | |
# Fix hyphenated words broken by newline | |
content = re.sub(r'(\w+)-\n(\w+)', r'\1\2', content) | |
# Remove specific unwanted patterns and characters | |
unwanted_patterns = [ | |
"\\n", " β", "ββββββββββ", "βββββββββ", "βββββ", | |
r'\\u[\dA-Fa-f]{4}', r'\uf075', r'\uf0b7' | |
] | |
for pattern in unwanted_patterns: | |
content = re.sub(pattern, "", content) | |
# Fix improperly spaced hyphenated words and normalize whitespace | |
content = re.sub(r'(\w)\s*-\s*(\w)', r'\1-\2', content) | |
content = re.sub(r'\s+', ' ', content) | |
return content | |
def create_embedder(self): | |
"""Get the right embedding model""" | |
embedding = HuggingFaceEmbedding(model_name=self.embedding) | |
return embedding | |
def pinecone_pipeline(self, embedding): | |
"""Initialize pinecone connection and vectorstore""" | |
# connect | |
pc = PineconeGRPC(api_key=self.pinecone_api_key) | |
# Create your index if index does not exist | |
indexes = [i.name for i in pc.list_indexes()] | |
index_exists = any([self.vector_db in i for i in indexes]) | |
if index_exists: | |
print("Index already exists") | |
else: | |
print("Creating index") | |
pc.create_index( | |
self.vector_db, | |
dimension=768, | |
metric="cosine", | |
spec=ServerlessSpec(cloud="aws", region="us-east-1"), | |
) | |
# Initialize your index | |
pinecone_index = pc.Index(self.vector_db) | |
# Initialize VectorStore | |
vector_store = PineconeVectorStore(pinecone_index=pinecone_index) | |
# create pipeline (abstracts away the need to adaptively process and batch) | |
pipeline = IngestionPipeline( | |
transformations=[ | |
# creating appropriate chunks and cutoffs (this needs to be worked on). | |
SemanticSplitterNodeParser( | |
buffer_size=10, # 1 = each sentence is a node | |
breakpoint_percentile_threshold=95, | |
embed_model=embedding, | |
), | |
embedding, | |
], | |
vector_store=vector_store | |
) | |
return pipeline | |
def embed(self): | |
"""stringing process above to embed and upsert directly to pinecone""" | |
# read_file | |
print("reading files") | |
results = self.files | |
# Call clean function | |
print("cleaning files") | |
for d in range(len(results)): | |
results[d].text = self.clean_text(results[d].text) | |
# set up embedder | |
print("retrieving embedder") | |
embedder = self.create_embedder() | |
# set up pinecone pipeline | |
print("initializing pinecone db") | |
pipeline = self.pinecone_pipeline(embedder) | |
# run pinecone in batches (of 1) for memory preservation. | |
print("reading into pinecone db") | |
batchsize = 1 | |
for i in range(0, len(results), batchsize): | |
gc.collect() | |
batch = pipeline.run(documents=results[i:i+batchsize]) | |
print("completed batch %s" % ((i+batchsize)/batchsize)) | |