Spaces:
Sleeping
Sleeping
import argparse | |
import os | |
from collections import defaultdict | |
from typing import Dict, Any | |
from concrete.util import CommunicationWriterTGZ | |
from nltk.corpus import framenet, framenet15 | |
from tqdm import tqdm | |
from sftp.data_reader.concrete_srl import concrete_doc | |
from tools.framenet.fn_util import framenet_split, Sentence as TokSentence | |
def process_sentence(sent) -> Dict[str, Any]: | |
ret = {'sentence': sent.text, 'tokenization': list(), 'annotations': list()} | |
tok_sent = TokSentence(sent.text) | |
for token in tok_sent.tokens: | |
ret['tokenization'].append((token.idx, token.idx_end-1)) | |
def process_one_ann_set(ann_set): | |
ret['annotations'].append(event := {'label': ann_set.frame.name, 'children': (arg_list := list())}) | |
target_list = list() | |
for tar_start, tar_end in ann_set.Target: | |
target_list.extend( | |
list(range(tok_sent.span(tar_start, tar_end)[0], tok_sent.span(tar_start, tar_end)[1]+1)) | |
) | |
target_list.sort() | |
event['span'] = (target_list[0], target_list[-1]) | |
for fe_start, fe_end, fe_name in ann_set.FE[0]: | |
fe_start, fe_end = tok_sent.span(fe_start, fe_end) | |
arg_list.append({ | |
'span': (fe_start, fe_end), | |
'label': fe_name | |
}) | |
if 'annotationSet' in sent: | |
for ann_item in sent.annotationSet: | |
if 'Target' not in ann_item: | |
continue | |
process_one_ann_set(ann_item) | |
if 'Target' in sent: | |
process_one_ann_set(sent) | |
return ret | |
def process_doc(docs, dst_path: str): | |
writer = CommunicationWriterTGZ(dst_path) | |
for doc in tqdm(docs): | |
sentences = list() | |
for sent in doc.sentence: | |
sentences.append(process_sentence(sent)) | |
comm = concrete_doc(sentences, doc.filename) | |
writer.write(comm, comm.id + '.concrete') | |
writer.close() | |
def process_exemplar(dst_path, fn): | |
bar = tqdm() | |
raw_annotations = list() | |
print('Loading exemplars...') | |
try: | |
for ann_sent in fn.annotations(full_text=False): | |
if 'Target' not in ann_sent: | |
continue | |
bar.update() | |
raw_annotations.append(ann_sent) | |
except RuntimeError: | |
pass | |
finally: | |
bar.close() | |
char_idx_offset = 0 | |
sentences = list() | |
for sent in raw_annotations: | |
sentences.append(process_sentence(sent)) | |
char_idx_offset += len(sent.text)+1 | |
comm = concrete_doc(sentences, 'exemplar') | |
CommunicationWriterTGZ(dst_path).write(comm, 'exemplar.concrete') | |
def run(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'dst', metavar='DESTINATION', type=str, | |
help='Destination folder path.' | |
) | |
parser.add_argument( | |
'-v', metavar='VERSION', default='1.7', type=str, choices=['1.5', '1.7'], | |
help='Version of FrameNet. Either 1.5 or 1.7.' | |
) | |
args = parser.parse_args() | |
fn = framenet if args.v == '1.7' else framenet15 | |
os.makedirs(args.dst, exist_ok=True) | |
doc_group = defaultdict(list) | |
for doc in fn.docs(): | |
if doc.filename in framenet_split['dev']: | |
doc_group['dev'].append(doc) | |
elif doc.filename in framenet_split['test']: | |
doc_group['test'].append(doc) | |
else: | |
doc_group['train'].append(doc) | |
for sp in framenet_split: | |
print(f'Loaded {len(doc_group[sp])} docs for {sp}.') | |
for sp in framenet_split: | |
process_doc(doc_group[sp], dst_path=os.path.join(args.dst, f'{sp}.tar.gz')) | |
process_exemplar(os.path.join(args.dst, 'exemplar.tar.gz'), fn) | |
if __name__ == '__main__': | |
run() | |