Spaces:
Runtime error
Runtime error
# Copyright 2020 The HuggingFace Team. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import sys | |
from typing import Dict | |
from transformers import EvalPrediction, HfArgumentParser, TrainingArguments, is_torch_available | |
from transformers.testing_utils import ( | |
TestCasePlus, | |
execute_subprocess_async, | |
get_torch_dist_unique_port, | |
require_torch_multi_gpu, | |
require_torch_neuroncore, | |
) | |
from transformers.utils import logging | |
logger = logging.get_logger(__name__) | |
if is_torch_available(): | |
import torch | |
from torch import nn | |
from torch.utils.data import Dataset | |
from transformers import Trainer | |
class DummyDataset(Dataset): | |
def __init__(self, length: int = 101): | |
self.length = length | |
def __len__(self): | |
return self.length | |
def __getitem__(self, i) -> int: | |
return i | |
class DummyDataCollator: | |
def __call__(self, features): | |
return {"input_ids": torch.tensor(features), "labels": torch.tensor(features)} | |
class DummyModel(nn.Module): | |
def __init__(self): | |
super().__init__() | |
# Add some (unused) params otherwise DDP will complain. | |
self.fc = nn.Linear(120, 80) | |
def forward(self, input_ids, labels=None): | |
if labels is not None: | |
return torch.tensor(0.0, device=input_ids.device), input_ids | |
else: | |
return input_ids | |
class TestTrainerDistributedNeuronCore(TestCasePlus): | |
def test_trainer(self): | |
distributed_args = f""" | |
-m torch.distributed.run | |
--nproc_per_node=2 | |
--master_port={get_torch_dist_unique_port()} | |
{self.test_file_dir}/test_trainer_distributed.py | |
""".split() | |
output_dir = self.get_auto_remove_tmp_dir() | |
args = f"--output_dir {output_dir}".split() | |
cmd = [sys.executable] + distributed_args + args | |
execute_subprocess_async(cmd, env=self.get_env()) | |
# successful return here == success - any errors would have caused an error in the sub-call | |
class TestTrainerDistributed(TestCasePlus): | |
def test_trainer(self): | |
distributed_args = f""" | |
-m torch.distributed.run | |
--nproc_per_node={torch.cuda.device_count()} | |
--master_port={get_torch_dist_unique_port()} | |
{self.test_file_dir}/test_trainer_distributed.py | |
""".split() | |
output_dir = self.get_auto_remove_tmp_dir() | |
args = f"--output_dir {output_dir}".split() | |
cmd = [sys.executable] + distributed_args + args | |
execute_subprocess_async(cmd, env=self.get_env()) | |
# successful return here == success - any errors would have caused an error in the sub-call | |
if __name__ == "__main__": | |
# The script below is meant to be run under torch.distributed, on a machine with multiple GPUs: | |
# | |
# PYTHONPATH="src" python -m torch.distributed.run --nproc_per_node 2 --output_dir output_dir ./tests/test_trainer_distributed.py | |
parser = HfArgumentParser((TrainingArguments,)) | |
training_args = parser.parse_args_into_dataclasses()[0] | |
logger.warning( | |
f"Process rank: {training_args.local_rank}, device: {training_args.device}, n_gpu: {training_args.n_gpu}, " | |
f"distributed training: {training_args.local_rank != -1}" | |
) | |
# Essentially, what we want to verify in the distributed case is that we get all samples back, | |
# in the right order. (this is crucial for prediction for instance) | |
for dataset_length in [101, 40, 7]: | |
dataset = DummyDataset(dataset_length) | |
def compute_metrics(p: EvalPrediction) -> Dict: | |
sequential = list(range(len(dataset))) | |
success = p.predictions.tolist() == sequential and p.label_ids.tolist() == sequential | |
if not success and training_args.local_rank == 0: | |
logger.warning( | |
"Predictions and/or labels do not match expected results:\n - predictions: " | |
f"{p.predictions.tolist()}\n - labels: {p.label_ids.tolist()}\n - expected: {sequential}" | |
) | |
return {"success": success} | |
trainer = Trainer( | |
model=DummyModel(), | |
args=training_args, | |
data_collator=DummyDataCollator(), | |
eval_dataset=dataset, | |
compute_metrics=compute_metrics, | |
) | |
metrics = trainer.evaluate() | |
logger.info(metrics) | |
if metrics["eval_success"] is not True: | |
logger.error(metrics) | |
exit(1) | |
p = trainer.predict(dataset) | |
logger.info(p.metrics) | |
if p.metrics["test_success"] is not True: | |
logger.error(p.metrics) | |
exit(1) | |
trainer.args.eval_accumulation_steps = 2 | |
metrics = trainer.evaluate() | |
logger.info(metrics) | |
if metrics["eval_success"] is not True: | |
logger.error(metrics) | |
exit(1) | |
p = trainer.predict(dataset) | |
logger.info(p.metrics) | |
if p.metrics["test_success"] is not True: | |
logger.error(p.metrics) | |
exit(1) | |
trainer.args.eval_accumulation_steps = None | |