Impossible_llm / data /tag_single.py
Yaning1001's picture
Add files using upload-large-folder tool
7332c68 verified
raw
history blame
5.11 kB
# single file can be split to some small files and run on different gpus
import torch
import torch.distributed as dist
import sys
sys.path.append("..")
import pytest
import glob
import tqdm
import os
import argparse
import stanza
import json
from transformers import AutoTokenizer
def chunk_text(text, tokenizer, max_length=512):
tokens = tokenizer(text)['input_ids']
chunks = [tokens[i:i + max_length] for i in range(0, len(tokens), max_length)]
return [tokenizer.decode(chunk, skip_special_tokens=True) for chunk in chunks]
def init_distributed_mode():
dist.init_process_group(backend='nccl')
rank = dist.get_rank()
torch.cuda.set_device(rank) # 使用rank指定GPU
return rank
def process_single_file(file, rank, tokenizer, nlp1, nlp2):
print(f"GPU {rank}: Processing {file.name}")
lines = file.readlines()
# 根据行数划分任务
num_lines = len(lines)
num_gpus = dist.get_world_size()
lines_per_gpu = (num_lines + num_gpus - 1) // num_gpus
start_idx = rank * lines_per_gpu
end_idx = min(start_idx + lines_per_gpu, num_lines)
gpu_lines = lines[start_idx:end_idx]
line_batches = [gpu_lines[i:i + BATCH_SIZE] for i in range(0, len(gpu_lines), BATCH_SIZE)]
text_batches = [" ".join(l) for l in line_batches]
line_annotations = []
for text in tqdm.tqdm(text_batches, desc=f"GPU {rank}"):
text_chunks = chunk_text(text, tokenizer)
for chunk in text_chunks:
doc = nlp1(chunk)
sent_annotations = []
for sent in doc.sentences:
word_annotations = []
for token, word in zip(sent.tokens, sent.words):
wa = {
'id': word.id,
'text': word.text,
'lemma': word.lemma,
'upos': word.upos,
'xpos': word.xpos,
'feats': word.feats,
'start_char': token.start_char,
'end_char': token.end_char
}
word_annotations.append(wa)
sa = {
'sent_text': sent.text,
'word_annotations': word_annotations
}
if args.parse:
sa['constituency_parse'] = __get_constituency_parse(sent, nlp2)
sent_annotations.append(sa)
line_annotations.append({'sent_annotations': sent_annotations})
# 暂存不同GPU的输出
temp_filename = os.path.splitext(file.name)[0] + f'_rank{rank}.json'
with open(temp_filename, "w") as outfile:
json.dump(line_annotations, outfile, indent=4)
return temp_filename
def merge_files(temp_files, output_file):
merged_data = []
for file in temp_files:
with open(file, "r") as infile:
data = json.load(infile)
merged_data.extend(data)
os.remove(file) # 删除临时文件
with open(output_file, "w") as outfile:
json.dump(merged_data, outfile, indent=4)
def run_on_gpu(rank, args, tokenizer, nlp1, nlp2):
print(f"Running on Rank {rank}, using GPU {torch.cuda.current_device()}")
temp_files = []
if len(args.path) == 1:
temp_files.append(process_single_file(args.path[0], rank, tokenizer, nlp1, nlp2))
dist.barrier() # 等待所有进程完成处理
if rank == 0:
# 合并文件
final_output = os.path.splitext(args.path[0].name)[0] + '_merged.json'
merge_files(temp_files, final_output)
else:
files_per_gpu = len(args.path) // dist.get_world_size()
start_idx = rank * files_per_gpu
end_idx = start_idx + files_per_gpu if rank != dist.get_world_size() - 1 else len(args.path)
gpu_files = args.path[start_idx:end_idx]
for file in gpu_files:
process_single_file(file, rank, tokenizer, nlp1, nlp2)
def __get_constituency_parse(sent, nlp):
try:
parse_doc = nlp(sent.text)
except:
return None
parse_trees = [str(sent.constituency) for sent in parse_doc.sentences]
return "(ROOT " + " ".join(parse_trees) + ")"
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog='Tag BabyLM dataset',
description='Tag BabyLM dataset using Stanza')
parser.add_argument('path', type=argparse.FileType('r'),
nargs='+', help="Path to file(s)")
parser.add_argument('-p', '--parse', action='store_true',
help="Include constituency parse")
args = parser.parse_args()
rank = init_distributed_mode()
BATCH_SIZE = 1000
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
nlp1 = stanza.Pipeline(lang='en', processors='tokenize,pos,lemma', package="default_accurate", use_gpu=True)
nlp2 = None
if args.parse:
nlp2 = stanza.Pipeline(lang='en', processors='tokenize,pos,constituency', package="default_accurate", use_gpu=True)
run_on_gpu(rank, args, tokenizer, nlp1, nlp2)