Impossible_llm / data /tag_distributed.py
Yaning1001's picture
Add files using upload-large-folder tool
7332c68 verified
# the files can be processed on different gpus, each file is processed on a gpu
import torch
import torch.distributed as dist
import sys
sys.path.append("..")
import pytest
import glob
import tqdm
import os
import argparse
import stanza
import json
from transformers import AutoTokenizer
def chunk_text(text, tokenizer, max_length=512):
tokens = tokenizer(text)['input_ids']
chunks = [tokens[i:i + max_length] for i in range(0, len(tokens), max_length)]
return [tokenizer.decode(chunk, skip_special_tokens=True) for chunk in chunks]
def init_distributed_mode():
dist.init_process_group(backend='nccl')
rank = dist.get_rank()
torch.cuda.set_device(rank) # 使用rank指定GPU
return rank
def run_on_gpu(rank, args, tokenizer, nlp1, nlp2):
print(f"Running on Rank {rank}, using GPU {torch.cuda.current_device()}")
print(f"Rank {rank}, GPU {torch.cuda.current_device()} started")
files_per_gpu = len(args.path) // dist.get_world_size()
start_idx = rank * files_per_gpu
end_idx = start_idx + files_per_gpu if rank != dist.get_world_size() - 1 else len(args.path)
gpu_files = args.path[start_idx:end_idx]
for file in gpu_files:
print(f"GPU {rank}: Processing {file.name}")
lines = file.readlines()
lines = [l.strip() for l in lines]
line_batches = [lines[i:i + BATCH_SIZE] for i in range(0, len(lines), BATCH_SIZE)]
text_batches = [" ".join(l) for l in line_batches]
line_annotations = []
for text in tqdm.tqdm(text_batches, desc=f"GPU {rank}"):
text_chunks = chunk_text(text, tokenizer)
for chunk in text_chunks:
doc = nlp1(chunk)
sent_annotations = []
for sent in doc.sentences:
word_annotations = []
for token, word in zip(sent.tokens, sent.words):
wa = {
'id': word.id,
'text': word.text,
'lemma': word.lemma,
'upos': word.upos,
'xpos': word.xpos,
'feats': word.feats,
'start_char': token.start_char,
'end_char': token.end_char
}
word_annotations.append(wa)
sa = {
'sent_text': sent.text,
'word_annotations': word_annotations
}
if args.parse:
sa['constituency_parse'] = __get_constituency_parse(sent, nlp2)
sent_annotations.append(sa)
line_annotations.append({'sent_annotations': sent_annotations})
json_filename = os.path.splitext(file.name)[0] + '_parsed.json' if args.parse else '.json'
with open(json_filename, "w") as outfile:
json.dump(line_annotations, outfile, indent=4)
def __get_constituency_parse(sent, nlp):
try:
parse_doc = nlp(sent.text)
except:
return None
parse_trees = [str(sent.constituency) for sent in parse_doc.sentences]
return "(ROOT " + " ".join(parse_trees) + ")"
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog='Tag BabyLM dataset',
description='Tag BabyLM dataset using Stanza')
parser.add_argument('path', type=argparse.FileType('r'),
nargs='+', help="Path to file(s)")
parser.add_argument('-p', '--parse', action='store_true',
help="Include constituency parse")
args = parser.parse_args()
rank = init_distributed_mode()
BATCH_SIZE = 1000
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
nlp1 = stanza.Pipeline(lang='en', processors='tokenize,pos,lemma', package="default_accurate", use_gpu=True)
nlp2 = None
if args.parse:
nlp2 = stanza.Pipeline(lang='en', processors='tokenize,pos,constituency', package="default_accurate", use_gpu=True)
run_on_gpu(rank, args, tokenizer, nlp1, nlp2)