# single file can be split to some small files and run on different gpus import torch import torch.distributed as dist import sys sys.path.append("..") import pytest import glob import tqdm import os import argparse import stanza import json from transformers import AutoTokenizer def chunk_text(text, tokenizer, max_length=512): tokens = tokenizer(text)['input_ids'] chunks = [tokens[i:i + max_length] for i in range(0, len(tokens), max_length)] return [tokenizer.decode(chunk, skip_special_tokens=True) for chunk in chunks] def init_distributed_mode(): dist.init_process_group(backend='nccl') rank = dist.get_rank() torch.cuda.set_device(rank) # 使用rank指定GPU return rank def process_single_file(file, rank, tokenizer, nlp1, nlp2): print(f"GPU {rank}: Processing {file.name}") lines = file.readlines() # 根据行数划分任务 num_lines = len(lines) num_gpus = dist.get_world_size() lines_per_gpu = (num_lines + num_gpus - 1) // num_gpus start_idx = rank * lines_per_gpu end_idx = min(start_idx + lines_per_gpu, num_lines) gpu_lines = lines[start_idx:end_idx] line_batches = [gpu_lines[i:i + BATCH_SIZE] for i in range(0, len(gpu_lines), BATCH_SIZE)] text_batches = [" ".join(l) for l in line_batches] line_annotations = [] for text in tqdm.tqdm(text_batches, desc=f"GPU {rank}"): text_chunks = chunk_text(text, tokenizer) for chunk in text_chunks: doc = nlp1(chunk) sent_annotations = [] for sent in doc.sentences: word_annotations = [] for token, word in zip(sent.tokens, sent.words): wa = { 'id': word.id, 'text': word.text, 'lemma': word.lemma, 'upos': word.upos, 'xpos': word.xpos, 'feats': word.feats, 'start_char': token.start_char, 'end_char': token.end_char } word_annotations.append(wa) sa = { 'sent_text': sent.text, 'word_annotations': word_annotations } if args.parse: sa['constituency_parse'] = __get_constituency_parse(sent, nlp2) sent_annotations.append(sa) line_annotations.append({'sent_annotations': sent_annotations}) # 暂存不同GPU的输出 temp_filename = os.path.splitext(file.name)[0] + f'_rank{rank}.json' with open(temp_filename, "w") as outfile: json.dump(line_annotations, outfile, indent=4) return temp_filename def merge_files(temp_files, output_file): merged_data = [] for file in temp_files: with open(file, "r") as infile: data = json.load(infile) merged_data.extend(data) os.remove(file) # 删除临时文件 with open(output_file, "w") as outfile: json.dump(merged_data, outfile, indent=4) def run_on_gpu(rank, args, tokenizer, nlp1, nlp2): print(f"Running on Rank {rank}, using GPU {torch.cuda.current_device()}") temp_files = [] if len(args.path) == 1: temp_files.append(process_single_file(args.path[0], rank, tokenizer, nlp1, nlp2)) dist.barrier() # 等待所有进程完成处理 if rank == 0: # 合并文件 final_output = os.path.splitext(args.path[0].name)[0] + '_merged.json' merge_files(temp_files, final_output) else: files_per_gpu = len(args.path) // dist.get_world_size() start_idx = rank * files_per_gpu end_idx = start_idx + files_per_gpu if rank != dist.get_world_size() - 1 else len(args.path) gpu_files = args.path[start_idx:end_idx] for file in gpu_files: process_single_file(file, rank, tokenizer, nlp1, nlp2) def __get_constituency_parse(sent, nlp): try: parse_doc = nlp(sent.text) except: return None parse_trees = [str(sent.constituency) for sent in parse_doc.sentences] return "(ROOT " + " ".join(parse_trees) + ")" if __name__ == "__main__": parser = argparse.ArgumentParser( prog='Tag BabyLM dataset', description='Tag BabyLM dataset using Stanza') parser.add_argument('path', type=argparse.FileType('r'), nargs='+', help="Path to file(s)") parser.add_argument('-p', '--parse', action='store_true', help="Include constituency parse") args = parser.parse_args() rank = init_distributed_mode() BATCH_SIZE = 1000 tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") nlp1 = stanza.Pipeline(lang='en', processors='tokenize,pos,lemma', package="default_accurate", use_gpu=True) nlp2 = None if args.parse: nlp2 = stanza.Pipeline(lang='en', processors='tokenize,pos,constituency', package="default_accurate", use_gpu=True) run_on_gpu(rank, args, tokenizer, nlp1, nlp2)