tykiww commited on
Commit
a5f561a
·
verified ·
1 Parent(s): f9b358b

Create transcript_loader.py

Browse files
Files changed (1) hide show
  1. utilities/transcript_loader.py +114 -0
utilities/transcript_loader.py ADDED
@@ -0,0 +1,114 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import webvtt
3
+ import re
4
+ from datetime import datetime
5
+ from llama_index import Document
6
+
7
+
8
+ class VTTTranscriptLoader:
9
+ """
10
+ vtt file ingestion and cleaning. This was done because vtt files
11
+ are not recognized by llamaindex. The output should mirror that of
12
+ any document loader from llamaindex or langchain.
13
+ """
14
+
15
+ def __init__(self, file_path):
16
+ self.fp = file_path
17
+ self.data = None
18
+
19
+ def open_vtt(self, file_path, plaintext=True):
20
+ """Read VTT file."""
21
+ if plaintext:
22
+ with open(file_path, "r") as f:
23
+ data = f.readlines()
24
+ else:
25
+ data = webvtt.read(file_path)
26
+ return data
27
+
28
+ def extract_speaker_name(self, text):
29
+ """Extracts the speaker name from a VTT caption."""
30
+ match = re.search(r"<v (.*?)>", text)
31
+ if match:
32
+ return match.group(1)
33
+ else:
34
+ return None
35
+
36
+ def extract_speaker_words(self, captions):
37
+ """Extracts the speaker text from a VTT caption."""
38
+ return [caption.text for caption in captions]
39
+
40
+ def merge_speaker_words(self, words, speakers, split=True):
41
+ """Joins speaker names with their words."""
42
+ # Extract speaker names
43
+ speaker_list = [self.extract_speaker_name(line) for line in speakers if self.extract_speaker_name(line)]
44
+ # Extract words
45
+ words_list = self.extract_speaker_words(words)
46
+ # Combine speaker names and words
47
+ combined_list = list(zip(speaker_list, words_list))
48
+ # Return the combined list as a single string if split is False
49
+ if not split:
50
+ combined_list = '\n'.join([f"{name}: '{text}'" for name, text in combined_list])
51
+ return combined_list, speaker_list
52
+
53
+ def get_metadata(self, speaker_list, file_path):
54
+ """Generates metadata for the transcript."""
55
+ # Meeting length
56
+ time_format = "%H:%M:%S.%f"
57
+ sess = self.open_vtt(file_path, plaintext=False)
58
+
59
+ dt1 = datetime.strptime(sess[0].start, time_format)
60
+ dt2 = datetime.strptime(sess[-1].end, time_format)
61
+
62
+ minutes = (dt2 - dt1).seconds / 60
63
+ # Meeting date
64
+ match = re.search(r"\d{4}[-_]\d{2}[-_]\d{2}", file_path)
65
+ if match:
66
+ date_str = match.group().replace('_', '-')
67
+ date_obj = datetime.strptime(date_str, "%Y-%m-%d").date()
68
+ else:
69
+ date_obj = None
70
+
71
+ # Pull dictionary here
72
+ output = {
73
+ 'title': file_path,
74
+ 'duration': minutes,
75
+ 'meeting_date': date_obj.strftime("%Y-%m-%d"),
76
+ 'speakers': list(set(speaker_list)),
77
+ }
78
+
79
+ return output
80
+
81
+ def manual_document(self, output, metadata):
82
+ """Create document manually"""
83
+ document = Document(text=output)
84
+ document.metadata = metadata
85
+ return document
86
+
87
+ def process_file(self, file_path):
88
+ """Processes a single VTT file and returns the combined speaker names and words."""
89
+ # Get words as webvtt captions
90
+ words = self.open_vtt(file_path, plaintext=False)
91
+ # Get speaker lines as plaintext
92
+ speaker = self.open_vtt(file_path, plaintext=True)
93
+ # Combine speaker names and words
94
+ output, speaker_list = self.merge_speaker_words(words, speaker, split=False)
95
+ # Get session data as dictionary
96
+ metadata = self.get_metadata(speaker_list, file_path)
97
+
98
+ return self.manual_document(output, metadata)
99
+
100
+ def load(self):
101
+ """Processes all VTT files in the directory or the single file and returns a list of results."""
102
+ results = []
103
+ if os.path.isdir(self.fp):
104
+ for root, _, files in os.walk(self.fp):
105
+ for file in files:
106
+ if file.endswith('.vtt'):
107
+ file_path = os.path.join(root, file)
108
+ transcript = self.process_file(file_path)
109
+ results.append(transcript)
110
+ else:
111
+ transcript = self.process_file(self.fp)
112
+ results.append(transcript)
113
+ return results
114
+