Spaces:
Runtime error
Runtime error
File size: 4,725 Bytes
27880c1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
import nltk
import pytextrank
import re
from operator import itemgetter
import en_core_web_sm
class KeywordExtractor:
"""
Keyword Extraction on text data
Attributes:
nlp: An instance English pipeline optimized for CPU for spacy
"""
def __init__(self):
self.nlp = en_core_web_sm.load()
self.nlp.add_pipe("textrank")
def get_keywords(self, text, max_keywords):
"""
Extract keywords from text.
Parameters:
text (str): The user input string to extract keywords from
Returns:
kws (list): list of extracted keywords
"""
doc = self.nlp(text)
kws = [i.text for i in doc._.phrases[:max_keywords]]
return kws
def get_keyword_indices(self, kws, text):
"""
Extract keywords from text.
Parameters:
kws (list): list of extracted keywords
text (str): The user input string to extract keywords from
Returns:
keyword_indices (list): list of indices for keyword boundaries in text
"""
keyword_indices = []
for s in kws:
indices = [[m.start(), m.end()] for m in re.finditer(re.escape(s), text)]
keyword_indices.extend(indices)
return keyword_indices
def merge_overlapping_indices(self, keyword_indices):
"""
Merge overlapping keyword indices.
Parameters:
keyword_indices (list): list of indices for keyword boundaries in text
Returns:
keyword_indices (list): list of indices for keyword boundaries in with overlapping combined
"""
# Sort the array on the basis of start values of intervals.
keyword_indices.sort()
stack = []
# insert first interval into stack
stack.append(keyword_indices[0])
for i in keyword_indices[1:]:
# Check for overlapping interval,
# if interval overlap
if (stack[-1][0] <= i[0] <= stack[-1][-1]) or (stack[-1][-1] == i[0]-1):
stack[-1][-1] = max(stack[-1][-1], i[-1])
else:
stack.append(i)
return stack
def merge_until_finished(self, keyword_indices):
"""
Loop until no overlapping keyword indices left.
Parameters:
keyword_indices (list): list of indices for keyword boundaries in text
Returns:
keyword_indices (list): list of indices for keyword boundaries in with overlapping combined
"""
len_indices = 0
while True:
# Merge overlapping indices
merged = self.merge_overlapping_indices(keyword_indices)
# Check to see if merging reduced number of annotation indices
# If merging did not reduce list return final indicies
if len_indices == len(merged):
out_indices = sorted(merged, key=itemgetter(0))
return out_indices
else:
len_indices = len(merged)
def get_annotation(self, text, keyword_indices):
"""
Create text annotation for extracted keywords.
Parameters:
keyword_indices (list): list of indices for keyword boundaries in text
Returns:
annotation (list): list of tuples for generating html
"""
# Turn list to numpy array
arr = list(text)
# Loop through indices in list and insert delimeters
for idx in sorted(keyword_indices, reverse=True):
arr.insert(idx[0], "<kw>")
arr.insert(idx[1]+1, "<!kw> <kw>")
# join array
joined_annotation = ''.join(arr)
# split array on delimeter
split = joined_annotation.split('<kw>')
# Create annotation for keywords in text
annotation = [(x.replace('<!kw> ', ''), "KEY", "#26aaef") if "<!kw>" in x else x for x in split]
return annotation
def generate(self, text, max_keywords):
"""
Create text annotation for extracted keywords.
Parameters:
text (str): The user input string to extract keywords from
max_keywords (int): Limit on number of keywords to generate
Returns:
annotation (list): list of tuples for generating html
kws (list): list of extracted keywords
"""
kws = self.get_keywords(text, max_keywords)
indices = list(self.get_keyword_indices(kws, text))
if indices:
indices_merged = self.merge_until_finished(indices)
annotation = self.get_annotation(text, indices_merged)
else:
annotation = None
return annotation, kws
|