Spaces:
Sleeping
Sleeping
#!/usr/bin/python3 | |
import sys | |
import pickle | |
import torch | |
from huggingface_hub import hf_hub_download | |
from .Data import Data | |
from .RNNTagger import RNNTagger | |
from .CRFTagger import CRFTagger | |
########################################################################### | |
# main function | |
########################################################################### | |
class Args: | |
def __init__(self, path_param, model_id, path_data, crf_beam_size, gpu, min_prob, print_probs) -> None: | |
self.path_param = path_param | |
self.model_id = model_id | |
self.path_data = path_data | |
self.crf_beam_size = crf_beam_size | |
self.gpu = gpu | |
self.min_prob = min_prob | |
self.print_probs = print_probs | |
# if __name__ == "__main__": | |
def annotate(tokens, path_param='MHGTagger/tagger', model_id='nielklug/rnn_tagger', path_data='', crf_beam_size=10, gpu=-1, min_prob=-1.0, print_probs=True): | |
# parser = argparse.ArgumentParser(description='Annotation program of the RNN-Tagger.') | |
# parser.add_argument('path_param', type=str, | |
# help='name of parameter file') | |
# parser.add_argument('path_data', type=str, | |
# help='name of the file with input data') | |
# parser.add_argument('--crf_beam_size', type=int, default=10, | |
# help='size of the CRF beam (if the system contains a CRF layer)') | |
# parser.add_argument('--gpu', type=int, default=0, | |
# help='selection of the GPU. The default is: 0 (CPU=-1)') | |
# parser.add_argument("--min_prob", type=float, default=-1.0, | |
# help="print all tags whose probability exceeds the probability of the best tag times this threshold") | |
# parser.add_argument("--print_probs", action="store_true", default=False, | |
# help="print the tag probabilities") | |
args = Args(path_param, model_id, path_data, crf_beam_size, gpu, min_prob, print_probs) | |
# Select the processing device | |
if args.gpu >= 0: | |
if not torch.cuda.is_available(): | |
print('No gpu available. Using cpu instead.', file=sys.stderr) | |
args.gpu = -1 | |
else: | |
if args.gpu >= torch.cuda.device_count(): | |
print('gpu '+str(args.gpu)+' not available. Using gpu 0 instead.', file=sys.stderr) | |
args.gpu = 0 | |
torch.cuda.set_device(args.gpu) | |
device = torch.device('cuda' if args.gpu >= 0 else 'cpu') | |
# load parameters | |
data = Data(args.path_param+'.io') # read the symbol mapping tables | |
with open(args.path_param+'.hyper', 'rb') as file: | |
hyper_params = pickle.load(file) | |
model = CRFTagger(*hyper_params) if len(hyper_params)==10 \ | |
else RNNTagger(*hyper_params) | |
model_file = hf_hub_download(repo_id=args.model_id, filename='tagger.rnn') | |
model.load_state_dict(torch.load(model_file, | |
map_location=torch.device('cpu'))) | |
model = model.to(device) | |
if type(model) is CRFTagger: | |
for optvar, option in zip((args.min_prob, args.print_probs), | |
("min_prob","print_probs")): | |
if optvar: | |
print(f"Warning: Option --{option} is ignored because the model has a CRF output layer", file=sys.stderr) | |
model.eval() | |
with torch.no_grad(): | |
for i, words in enumerate(data.single_sentences(tokens)): | |
# print(i, end='\r', file=sys.stderr, flush=True) | |
# map words to numbers and create Torch variables | |
fwd_charIDs, bwd_charIDs = data.words2charIDvec(words) | |
fwd_charIDs = torch.LongTensor(fwd_charIDs).to(device) | |
bwd_charIDs = torch.LongTensor(bwd_charIDs).to(device) | |
words_all = [] | |
tagged = [] | |
probs_all = [] | |
# run the model | |
if type(model) is RNNTagger: | |
tagscores = model(fwd_charIDs, bwd_charIDs) | |
if args.min_prob == -1.0: | |
# only print the word and tag with the highest score | |
tagIDs = tagscores.argmax(-1) | |
tags = data.IDs2tags(tagIDs.to("cpu")) | |
if not args.print_probs: | |
for word, tag in zip(words, tags): | |
# print(word, tag, sep="\t") | |
words_all.append(word) | |
tagged.append(tag) | |
else: | |
# print probabilities as well | |
tagprobs = torch.nn.functional.softmax(tagscores, dim=-1) | |
# get the probabilities of the highest-scoring tags | |
probs = tagprobs[range(len(tagIDs)), tagIDs].to("cpu").tolist() | |
# print the result | |
for word, tag, prob in zip(words, tags, probs): | |
# print(word, tag, round(float(prob), 4), sep="\t") | |
words_all.append(word) | |
tagged.append(tag) | |
probs_all.append(round(float(prob), 4)) | |
else: | |
# print the best tags for each word | |
tagprobs = torch.nn.functional.softmax(tagscores, dim=-1) | |
# get the most probable tag and its probability | |
best_probs, _ = tagprobs.max(-1) | |
# get all tags with a probability above best_prob * min_prob | |
thresholds = best_probs * args.min_prob | |
greaterflags = (tagprobs > thresholds.unsqueeze(1)) | |
for word, flags, probs in zip(words, greaterflags, tagprobs): | |
# get the IDs of the best tags | |
IDs = flags.nonzero() | |
# get the best tags and their probabilities | |
best_probs = probs[IDs].to("cpu") | |
best_tags = data.IDs2tags(IDs.to("cpu")) | |
# sort the tags by decreasing probability | |
sorted_list = sorted(zip(best_tags, best_probs), key=lambda x:-x[1]) | |
best_tags, best_probs = zip(*sorted_list) | |
# generate the output | |
if args.print_probs: | |
# append the probabilities to the tags | |
best_tags = [f"{t} {float(p):.4f}" for t, p in zip(best_tags, best_probs)] | |
print(word, ' '.join(best_tags), sep="\t") | |
elif type(model) is CRFTagger: | |
tagIDs = model(fwd_charIDs, bwd_charIDs) | |
tags = data.IDs2tags(tagIDs) | |
for word, tag in zip(words, tags): | |
print(word, tag, sep='\t') | |
else: | |
sys.exit('Error') | |
return (words_all, tagged, probs_all) | |