|
""" |
|
utils for Hengam inference |
|
""" |
|
|
|
"""### Import Libraries""" |
|
|
|
|
|
import os |
|
import pandas as pd |
|
from tqdm import tqdm |
|
import numpy as np |
|
import json |
|
|
|
|
|
from seqeval.metrics import accuracy_score, precision_score, recall_score, f1_score |
|
from seqeval.scheme import IOB2 |
|
|
|
|
|
import torch |
|
from torch.utils.data import DataLoader |
|
from torch.utils.data import Dataset |
|
from torch.nn.utils.rnn import pad_sequence |
|
import torch.nn as nn |
|
|
|
|
|
import pytorch_lightning as pl |
|
from torchcrf import CRF as SUPERCRF |
|
|
|
|
|
import nltk |
|
from nltk.tokenize import RegexpTokenizer |
|
|
|
|
|
from transformers import XLMRobertaTokenizerFast |
|
from transformers import XLMRobertaModel, XLMRobertaConfig |
|
|
|
|
|
|
|
from typing import Union, Dict, List, Tuple, Any, Optional |
|
|
|
import glob |
|
|
|
|
|
nltk.download('punkt') |
|
|
|
|
|
"""## XLM-Roberta |
|
### TokenFromSubtoken |
|
- Code adapted from the following [file](https://github.com/deepmipt/DeepPavlov/blob/master/deeppavlov/models/torch_bert/torch_transformers_sequence_tagger.py) |
|
- DeepPavlov is an popular open source library for deep learning end-to-end dialog systems and chatbots. |
|
- Licensed under the Apache License, Version 2.0 (the "License"); |
|
""" |
|
|
|
class TokenFromSubtoken(torch.nn.Module): |
|
|
|
def forward(self, units: torch.Tensor, mask: torch.Tensor) -> torch.Tensor: |
|
""" Assemble token level units from subtoken level units |
|
Args: |
|
units: torch.Tensor of shape [batch_size, SUBTOKEN_seq_length, n_features] |
|
mask: mask of token beginnings. For example: for tokens |
|
[[``[CLS]`` ``My``, ``capybara``, ``[SEP]``], |
|
[``[CLS]`` ``Your``, ``aar``, ``##dvark``, ``is``, ``awesome``, ``[SEP]``]] |
|
the mask will be |
|
[[0, 1, 1, 0, 0, 0, 0], |
|
[0, 1, 1, 0, 1, 1, 0]] |
|
Returns: |
|
word_level_units: Units assembled from ones in the mask. For the |
|
example above this units will correspond to the following |
|
[[``My``, ``capybara``], |
|
[``Your`, ``aar``, ``is``, ``awesome``,]] |
|
the shape of this tensor will be [batch_size, TOKEN_seq_length, n_features] |
|
""" |
|
|
|
device = units.device |
|
nf_int = units.size()[-1] |
|
batch_size = units.size()[0] |
|
|
|
|
|
token_seq_lengths = torch.sum(mask, 1).to(torch.int64) |
|
|
|
n_words = torch.sum(token_seq_lengths) |
|
|
|
max_token_seq_len = torch.max(token_seq_lengths) |
|
|
|
idxs = torch.stack(torch.nonzero(mask, as_tuple=True), dim=1) |
|
|
|
sample_ids_in_batch = torch.nn.functional.pad(input=idxs[:, 0], pad=[1, 0]) |
|
|
|
a = (~torch.eq(sample_ids_in_batch[1:], sample_ids_in_batch[:-1])).to(torch.int64) |
|
|
|
|
|
q = a * torch.arange(n_words, device=device).to(torch.int64) |
|
count_to_substract = torch.nn.functional.pad(torch.masked_select(q, q.to(torch.bool)), [1, 0]) |
|
|
|
new_word_indices = torch.arange(n_words, device=device).to(torch.int64) - count_to_substract[torch.cumsum(a, 0)] |
|
|
|
n_total_word_elements = max_token_seq_len*torch.ones_like(token_seq_lengths, device=device).sum() |
|
word_indices_flat = (idxs[:, 0] * max_token_seq_len + new_word_indices).to(torch.int64) |
|
|
|
|
|
x_mask = torch.zeros(n_total_word_elements, dtype=torch.bool, device=device) |
|
x_mask[word_indices_flat] = torch.ones_like(word_indices_flat, device=device, dtype=torch.bool) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
nonword_indices_flat = (~x_mask).nonzero().squeeze(-1) |
|
|
|
|
|
|
|
|
|
elements = units[mask.bool()] |
|
|
|
|
|
|
|
paddings = torch.zeros_like(nonword_indices_flat, dtype=elements.dtype).unsqueeze(-1).repeat(1,nf_int).to(device) |
|
|
|
|
|
tensor_flat_unordered = torch.cat([elements, paddings]) |
|
_, order_idx = torch.sort(torch.cat([word_indices_flat, nonword_indices_flat])) |
|
tensor_flat = tensor_flat_unordered[order_idx] |
|
|
|
tensor = torch.reshape(tensor_flat, (-1, max_token_seq_len, nf_int)) |
|
|
|
|
|
|
|
|
|
return tensor |
|
|
|
"""### Conditional Random Field |
|
- Code adopted form [torchcrf library](https://pytorch-crf.readthedocs.io/en/stable/) |
|
- we override veiterbi decoder in order to make it compatible with our code |
|
""" |
|
|
|
class CRF(SUPERCRF): |
|
|
|
|
|
def _viterbi_decode(self, emissions: torch.FloatTensor, |
|
mask: torch.ByteTensor) -> List[List[int]]: |
|
|
|
|
|
assert emissions.dim() == 3 and mask.dim() == 2 |
|
assert emissions.shape[:2] == mask.shape |
|
assert emissions.size(2) == self.num_tags |
|
assert mask[0].all() |
|
|
|
seq_length, batch_size = mask.shape |
|
|
|
|
|
|
|
score = self.start_transitions + emissions[0] |
|
history = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for i in range(1, seq_length): |
|
|
|
|
|
broadcast_score = score.unsqueeze(2) |
|
|
|
|
|
|
|
broadcast_emission = emissions[i].unsqueeze(1) |
|
|
|
|
|
|
|
|
|
|
|
next_score = broadcast_score + self.transitions + broadcast_emission |
|
|
|
|
|
|
|
next_score, indices = next_score.max(dim=1) |
|
|
|
|
|
|
|
|
|
score = torch.where(mask[i].unsqueeze(1), next_score, score) |
|
history.append(indices) |
|
|
|
history = torch.stack(history, dim=0) |
|
|
|
|
|
|
|
score += self.end_transitions |
|
|
|
|
|
|
|
|
|
seq_ends = mask.long().sum(dim=0) - 1 |
|
best_tags_list = [] |
|
|
|
for idx in range(batch_size): |
|
|
|
|
|
_, best_last_tag = score[idx].max(dim=0) |
|
best_tags = [best_last_tag] |
|
|
|
|
|
|
|
for i, hist in enumerate(torch.flip(history[:seq_ends[idx]], dims=(0,))): |
|
best_last_tag = hist[idx][best_tags[-1]] |
|
best_tags.append(best_last_tag) |
|
|
|
best_tags = torch.stack(best_tags, dim=0) |
|
|
|
|
|
best_tags_list.append(torch.flip(best_tags, dims=(0,))) |
|
|
|
best_tags_list = nn.utils.rnn.pad_sequence(best_tags_list, batch_first=True, padding_value=0) |
|
|
|
return best_tags_list |
|
|
|
"""### CRFLayer |
|
- Forward: decide output logits basaed on backbone network |
|
- Decode: decode based on CRF weights |
|
""" |
|
|
|
class CRFLayer(nn.Module): |
|
def __init__(self, embedding_size, n_labels): |
|
|
|
super(CRFLayer, self).__init__() |
|
self.dropout = nn.Dropout(0.1) |
|
self.output_dense = nn.Linear(embedding_size,n_labels) |
|
self.crf = CRF(n_labels, batch_first=True) |
|
self.token_from_subtoken = TokenFromSubtoken() |
|
|
|
|
|
def forward(self, embedding, mask): |
|
logits = self.output_dense(self.dropout(embedding)) |
|
logits = self.token_from_subtoken(logits, mask) |
|
pad_mask = self.token_from_subtoken(mask.unsqueeze(-1), mask).squeeze(-1).bool() |
|
return logits, pad_mask |
|
|
|
|
|
def decode(self, logits, pad_mask): |
|
return self.crf.decode(logits, pad_mask) |
|
|
|
|
|
def eval_loss(self, logits, targets, pad_mask): |
|
mean_log_likelihood = self.crf(logits, targets, pad_mask, reduction='sum').mean() |
|
return -mean_log_likelihood |
|
|
|
"""### NERModel |
|
- Roberta Model with CRF Layer |
|
""" |
|
|
|
class NERModel(nn.Module): |
|
|
|
def __init__(self, n_labels:int, roberta_path:str): |
|
super(NERModel,self).__init__() |
|
self.roberta = XLMRobertaModel.from_pretrained(roberta_path) |
|
self.crf = CRFLayer(self.roberta.config.hidden_size, n_labels) |
|
|
|
|
|
def forward(self, |
|
input_ids:torch.Tensor, |
|
attention_mask:torch.Tensor, |
|
token_type_ids:torch.Tensor, |
|
mask:torch.Tensor) -> torch.Tensor: |
|
|
|
embedding = self.roberta(input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
token_type_ids=token_type_ids)[0] |
|
logits, pad_mask = self.crf(embedding, mask) |
|
return logits, pad_mask |
|
|
|
|
|
@torch.no_grad() |
|
def predict(self, inputs:Tuple[torch.Tensor]) -> torch.Tensor: |
|
input_ids, attention_mask, token_type_ids, mask = inputs |
|
logits, pad_mask = self(input_ids, attention_mask, token_type_ids, mask) |
|
decoded = self.crf.decode(logits, pad_mask) |
|
return decoded, pad_mask |
|
|
|
|
|
def decode(self, logits, pad_mask): |
|
"""Decode logits using CRF weights |
|
""" |
|
return self.crf.decode(logits, pad_mask) |
|
|
|
|
|
def eval_loss(self, logits, targets, pad_mask): |
|
return self.crf.eval_loss(logits, targets, pad_mask) |
|
|
|
|
|
def freeze_roberta(self, n_freeze:int=6): |
|
for param in self.roberta.parameters(): |
|
param.requires_grad = False |
|
|
|
for param in self.roberta.encoder.layer[n_freeze:].parameters(): |
|
param.requires_grad = True |
|
|
|
"""### NERTokenizer |
|
- NLTK tokenizer along with XLMRobertaTokenizerFast tokenizer |
|
- Code adapted from the following [file](https://github.com/ugurcanozalp/multilingual-ner/blob/main/multiner/utils/custom_tokenizer.py) |
|
""" |
|
|
|
class NERTokenizer(object): |
|
|
|
MAX_LEN=512 |
|
BATCH_LENGTH_LIMT = 380 |
|
|
|
|
|
PATTERN = r'''(?x) # set flag to allow verbose regexps |
|
(?:[A-Z]\.)+ # abbreviations, e.g. U.S.A. or U.S.A # |
|
| (?:\d+\.) # numbers |
|
| \w+(?:[-.]\w+)* # words with optional internal hyphens |
|
| \$?\d+(?:.\d+)?%? # currency and percentages, e.g. $12.40, 82% |
|
| \.\.\. # ellipsis, and special chars below, includes ], [ |
|
| [-\]\[.؟،؛;"'?,():_`“”/°º‘’″…#$%()*+<>=@\\^_{}|~❑&§\!] |
|
| \u200c |
|
''' |
|
|
|
def __init__(self, base_model:str, to_device:str='cpu'): |
|
super(NERTokenizer,self).__init__() |
|
self.roberta_tokenizer = XLMRobertaTokenizerFast.from_pretrained(base_model, do_lower_case=False, padding=True, truncation=True) |
|
self.to_device = to_device |
|
|
|
self.word_tokenizer = RegexpTokenizer(self.PATTERN) |
|
self.sent_tokenizer = nltk.data.load('tokenizers/punkt/english.pickle') |
|
|
|
|
|
def tokenize_batch(self, inputs, pad_to = None) -> torch.Tensor: |
|
batch = [inputs] if isinstance(inputs[0], str) else inputs |
|
|
|
input_ids, attention_mask, token_type_ids, mask = [], [], [], [] |
|
for tokens in batch: |
|
input_ids_tmp, attention_mask_tmp, token_type_ids_tmp, mask_tmp = self._tokenize_words(tokens) |
|
input_ids.append(input_ids_tmp) |
|
attention_mask.append(attention_mask_tmp) |
|
token_type_ids.append(token_type_ids_tmp) |
|
mask.append(mask_tmp) |
|
|
|
input_ids = pad_sequence(input_ids, batch_first=True, padding_value=self.roberta_tokenizer.pad_token_id) |
|
attention_mask = pad_sequence(attention_mask, batch_first=True, padding_value=0) |
|
token_type_ids = pad_sequence(token_type_ids, batch_first=True, padding_value=0) |
|
mask = pad_sequence(mask, batch_first=True, padding_value=0) |
|
|
|
if input_ids.shape[-1]>self.MAX_LEN: |
|
input_ids = input_ids[:,:,:self.MAX_LEN] |
|
attention_mask = attention_mask[:,:,:self.MAX_LEN] |
|
token_type_ids = token_type_ids[:,:,:self.MAX_LEN] |
|
mask = mask[:,:,:self.MAX_LEN] |
|
|
|
|
|
elif pad_to is not None and pad_to>input_ids.shape[1]: |
|
bs = input_ids.shape[0] |
|
padlen = pad_to-input_ids.shape[1] |
|
|
|
input_ids_append = torch.tensor([self.roberta_tokenizer.pad_token_id], dtype=torch.long).repeat([bs, padlen]).to(self.to_device) |
|
input_ids = torch.cat([input_ids, input_ids_append], dim=-1) |
|
|
|
attention_mask_append = torch.tensor([0], dtype=torch.long).repeat([bs, padlen]).to(self.to_device) |
|
attention_mask = torch.cat([attention_mask, attention_mask_append], dim=-1) |
|
|
|
token_type_ids_append = torch.tensor([0], dtype=torch.long).repeat([bs, padlen]).to(self.to_device) |
|
token_type_ids = torch.cat([token_type_ids, token_type_ids_append], dim=-1) |
|
|
|
mask_append = torch.tensor([0], dtype=torch.long).repeat([bs, padlen]).to(self.to_device) |
|
mask = torch.cat([mask, mask_append], dim=-1) |
|
|
|
|
|
elif pad_to is not None and pad_to<input_ids.shape[1]: |
|
input_ids = input_ids[:,:,:pad_to] |
|
attention_mask = attention_mask[:,:,:pad_to] |
|
token_type_ids = token_type_ids[:,:,:pad_to] |
|
mask = mask[:,:,:pad_to] |
|
|
|
if isinstance(inputs[0], str): |
|
return input_ids[0], attention_mask[0], token_type_ids[0], mask[0] |
|
else: |
|
return input_ids, attention_mask, token_type_ids, mask |
|
|
|
|
|
def _tokenize_words(self, words): |
|
roberta_tokens = [] |
|
mask = [] |
|
for word in words: |
|
subtokens = self.roberta_tokenizer.tokenize(word) |
|
roberta_tokens+=subtokens |
|
n_subtoken = len(subtokens) |
|
if n_subtoken>=1: |
|
mask = mask + [1] + [0]*(n_subtoken-1) |
|
|
|
|
|
roberta_tokens = [self.roberta_tokenizer.cls_token] + roberta_tokens + [self.roberta_tokenizer.sep_token] |
|
mask = [0] + mask + [0] |
|
input_ids = torch.tensor(self.roberta_tokenizer.convert_tokens_to_ids(roberta_tokens), dtype=torch.long).to(self.to_device) |
|
attention_mask = torch.ones(len(mask), dtype=torch.long).to(self.to_device) |
|
token_type_ids = torch.zeros(len(mask), dtype=torch.long).to(self.to_device) |
|
mask = torch.tensor(mask, dtype=torch.long).to(self.to_device) |
|
return input_ids, attention_mask, token_type_ids, mask |
|
|
|
|
|
def sent_to_token(self, raw_text): |
|
for offset, ending in self.sent_tokenizer.span_tokenize(raw_text): |
|
sub_text = raw_text[offset:ending] |
|
words, spans = [], [] |
|
flush = False |
|
total_subtoken = 0 |
|
for start, end in self.word_tokenizer.span_tokenize(sub_text): |
|
flush = True |
|
start += offset |
|
end += offset |
|
words.append(raw_text[start:end]) |
|
spans.append((start,end)) |
|
total_subtoken += len(self.roberta_tokenizer.tokenize(words[-1])) |
|
if (total_subtoken > self.BATCH_LENGTH_LIMT): |
|
|
|
yield words[:-1],spans[:-1] |
|
spans = spans[len(spans)-1:] |
|
words = words[len(words)-1:] |
|
total_subtoken = sum([len(self.roberta_tokenizer.tokenize(word)) for word in words]) |
|
flush = False |
|
|
|
if flush and len(spans) > 0: |
|
yield words,spans |
|
|
|
|
|
def prepare_row_text(self, raw_text, batch_size=16): |
|
words_list, spans_list = [], [] |
|
end_batch = False |
|
for words, spans in self.sent_to_token(raw_text): |
|
end_batch = True |
|
words_list.append(words) |
|
spans_list.append(spans) |
|
if len(spans_list) >= batch_size: |
|
input_ids, attention_mask, token_type_ids, mask = self.tokenize_batch(words_list) |
|
yield (input_ids, attention_mask, token_type_ids, mask), words_list, spans_list |
|
words_list, spans_list = [], [] |
|
if end_batch and len(words_list) > 0: |
|
input_ids, attention_mask, token_type_ids, mask = self.tokenize_batch(words_list) |
|
yield (input_ids, attention_mask, token_type_ids, mask), words_list, spans_list |
|
|
|
"""### NER |
|
NER Interface : We Use this interface to infer sentence Time-Date tags. |
|
""" |
|
|
|
class NER(object): |
|
|
|
def __init__(self, model_path, tags): |
|
|
|
self.tags = tags |
|
self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
roberta_path = "xlm-roberta-base" |
|
self.model = NERModel(n_labels=len(self.tags), roberta_path=roberta_path).to(self.device) |
|
|
|
state_dict = torch.load(model_path) |
|
self.model.load_state_dict(state_dict, strict=False) |
|
|
|
self.model.eval() |
|
self.tokenizer = NERTokenizer(base_model=roberta_path, to_device=self.device) |
|
|
|
|
|
@torch.no_grad() |
|
def __call__(self, raw_text): |
|
|
|
outputs_flat, spans_flat, entities = [], [], [] |
|
for batch, words, spans in self.tokenizer.prepare_row_text(raw_text): |
|
output, pad_mask = self.model.predict(batch) |
|
outputs_flat.extend(output[pad_mask.bool()].reshape(-1).tolist()) |
|
spans_flat += sum(spans, []) |
|
|
|
for tag_idx,(start,end) in zip(outputs_flat,spans_flat): |
|
tag = self.tags[tag_idx] |
|
|
|
if tag != 'O': |
|
entities.append({'Text': raw_text[start:end], |
|
'Tag': tag, |
|
'Start':start, |
|
'End': end}) |
|
|
|
return entities |