File size: 15,114 Bytes
a1d409e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# Copyright 2020 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import math
import os
import re
import sys
import unittest
from pathlib import Path
from typing import Tuple
from unittest.mock import patch

from parameterized import parameterized

from transformers.testing_utils import (
    CaptureStderr,
    ExtendSysPath,
    TestCasePlus,
    execute_subprocess_async,
    get_gpu_count,
    get_torch_dist_unique_port,
    require_apex,
    require_bitsandbytes,
    require_fairscale,
    require_torch,
    require_torch_gpu,
    require_torch_multi_gpu,
    require_torch_non_multi_gpu,
    slow,
)
from transformers.trainer_callback import TrainerState
from transformers.trainer_utils import set_seed


bindir = os.path.abspath(os.path.dirname(__file__))
with ExtendSysPath(f"{bindir}/../../examples/pytorch/translation"):
    from run_translation import main  # noqa


set_seed(42)
MARIAN_MODEL = "sshleifer/student_marian_en_ro_6_1"
MBART_TINY = "sshleifer/tiny-mbart"


@require_torch
class TestTrainerExt(TestCasePlus):
    def run_seq2seq_quick(
        self,
        distributed=False,
        extra_args_str=None,
        predict_with_generate=True,
        do_train=True,
        do_eval=True,
        do_predict=True,
    ):
        output_dir = self.run_trainer(
            eval_steps=1,
            max_len=12,
            model_name=MBART_TINY,
            num_train_epochs=1,
            distributed=distributed,
            extra_args_str=extra_args_str,
            predict_with_generate=predict_with_generate,
            do_train=do_train,
            do_eval=do_eval,
            do_predict=do_predict,
        )
        logs = TrainerState.load_from_json(os.path.join(output_dir, "trainer_state.json")).log_history

        if not do_eval:
            return

        eval_metrics = [log for log in logs if "eval_loss" in log.keys()]

        first_step_stats = eval_metrics[0]
        if predict_with_generate:
            assert "eval_bleu" in first_step_stats

            last_step_stats = eval_metrics[-1]
            assert isinstance(last_step_stats["eval_bleu"], float)
            assert not math.isnan(float(last_step_stats["eval_loss"])), "eval_loss must not be `nan`"

    @require_torch_non_multi_gpu
    def test_run_seq2seq_no_dist(self):
        self.run_seq2seq_quick()

    # verify that the trainer can handle non-distributed with n_gpu > 1
    @require_torch_multi_gpu
    def test_run_seq2seq_dp(self):
        self.run_seq2seq_quick(distributed=False)

    # verify that the trainer can handle distributed with n_gpu > 1
    @require_torch_multi_gpu
    def test_run_seq2seq_ddp(self):
        self.run_seq2seq_quick(distributed=True)

    # test --sharded_ddp w/o --fp16
    @unittest.skip("Requires an update of the env running those tests")
    @require_torch_multi_gpu
    @require_fairscale
    def test_run_seq2seq_sharded_ddp(self):
        self.run_seq2seq_quick(distributed=True, extra_args_str="--sharded_ddp simple")

    # test --sharded_ddp w/ --fp16
    @unittest.skip("Requires an update of the env running those tests")
    @require_torch_multi_gpu
    @require_fairscale
    def test_run_seq2seq_sharded_ddp_fp16(self):
        self.run_seq2seq_quick(distributed=True, extra_args_str="--sharded_ddp simple --fp16")

    # test --sharded_ddp zero_dp_2 w/o --fp16
    @unittest.skip("Requires an update of the env running those tests")
    @require_torch_multi_gpu
    @require_fairscale
    def test_run_seq2seq_fully_sharded_ddp(self):
        self.run_seq2seq_quick(distributed=True, extra_args_str="--sharded_ddp zero_dp_2", predict_with_generate=False)

    # test --sharded_ddp zero_dp_2 w/ --fp16
    @unittest.skip("Requires an update of the env running those tests")
    @require_torch_multi_gpu
    @require_fairscale
    def test_run_seq2seq_fully_sharded_ddp_fp16(self):
        self.run_seq2seq_quick(
            distributed=True, extra_args_str="--sharded_ddp zero_dp_2 --fp16", predict_with_generate=False
        )

    @require_apex
    @require_torch_gpu
    def test_run_seq2seq_apex(self):
        # XXX: apex breaks the trainer if it's run twice e.g. run_seq2seq.main() from the same
        # program and it breaks other tests that run from the same pytest worker, therefore until this is
        # sorted out it must be run only in an external program, that is distributed=True in this
        # test and only under one or more gpus - if we want cpu will need to make a special test
        #
        # specifically to the problem traced it to self.optimizer.step() - if it's run 2nd time via
        # 2nd main() call it botches the future eval.
        #
        self.run_seq2seq_quick(distributed=True, extra_args_str="--fp16 --fp16_backend=apex")
        # test 2nd time - was getting eval_loss': nan'
        # to reproduce the problem set distributed=False
        self.run_seq2seq_quick(distributed=True, extra_args_str="--fp16 --fp16_backend=apex")

    @parameterized.expand(["base", "low", "high", "mixed"])
    @require_torch_multi_gpu
    def test_trainer_log_level_replica(self, experiment_id):
        # as each sub-test is slow-ish split into multiple sub-tests to avoid CI timeout
        experiments = {
            # test with the default log_level - should be info and thus log info once
            "base": {"extra_args_str": "", "n_matches": 1},
            # test with low log_level and log_level_replica - should be noisy on all processes
            # now the info string should appear twice on 2 processes
            "low": {"extra_args_str": "--log_level debug --log_level_replica debug", "n_matches": 2},
            # test with high log_level and low log_level_replica
            # now the info string should appear once only on the replica
            "high": {"extra_args_str": "--log_level error --log_level_replica debug", "n_matches": 1},
            # test with high log_level and log_level_replica - should be quiet on all processes
            "mixed": {"extra_args_str": "--log_level error --log_level_replica error", "n_matches": 0},
        }

        data = experiments[experiment_id]
        kwargs = {"distributed": True, "predict_with_generate": False, "do_eval": False, "do_predict": False}
        log_info_string = "Running training"
        with CaptureStderr() as cl:
            self.run_seq2seq_quick(**kwargs, extra_args_str=data["extra_args_str"])
        n_matches = len(re.findall(log_info_string, cl.err))
        self.assertEqual(n_matches, data["n_matches"])

    @slow
    def test_run_seq2seq(self):
        output_dir = self.run_trainer(
            eval_steps=2,
            max_len=128,
            model_name=MARIAN_MODEL,
            learning_rate=3e-4,
            num_train_epochs=10,
            distributed=False,
        )

        # Check metrics
        logs = TrainerState.load_from_json(os.path.join(output_dir, "trainer_state.json")).log_history
        eval_metrics = [log for log in logs if "eval_loss" in log.keys()]
        first_step_stats = eval_metrics[0]
        last_step_stats = eval_metrics[-1]

        assert first_step_stats["eval_loss"] > last_step_stats["eval_loss"], "model learned nothing"
        assert isinstance(last_step_stats["eval_bleu"], float)

        # test if do_predict saves generations and metrics
        contents = os.listdir(output_dir)
        contents = {os.path.basename(p) for p in contents}
        assert "generated_predictions.txt" in contents
        assert "predict_results.json" in contents

    @slow
    @require_bitsandbytes
    def test_run_seq2seq_bnb(self):
        from transformers.training_args import OptimizerNames

        def train_and_return_metrics(optim: str) -> Tuple[int, float]:
            extra_args = "--skip_memory_metrics 0"

            output_dir = self.run_trainer(
                max_len=128,
                model_name=MARIAN_MODEL,
                learning_rate=3e-4,
                num_train_epochs=1,
                optim=optim,
                distributed=True,  # force run in a new process
                extra_args_str=extra_args,
                do_eval=False,
                do_predict=False,
                n_gpus_to_use=1,  # to allow deterministic fixed memory usage
            )

            # Check metrics
            logs = TrainerState.load_from_json(Path(output_dir, "trainer_state.json")).log_history
            gpu_peak_mem_mb = int(logs[0]["train_mem_gpu_peaked_delta"] / 2**20)
            gpu_alloc_mem_mb = int(logs[0]["train_mem_gpu_alloc_delta"] / 2**20)

            loss = logs[0]["train_loss"]
            return gpu_peak_mem_mb, gpu_alloc_mem_mb, loss

        gpu_peak_mem_orig, gpu_alloc_mem_orig, loss_orig = train_and_return_metrics(OptimizerNames.ADAMW_TORCH.value)
        gpu_peak_mem_bnb, gpu_alloc_mem_bnb, loss_bnb = train_and_return_metrics(OptimizerNames.ADAMW_BNB.value)

        gpu_alloc_mem_diff = gpu_alloc_mem_orig - gpu_alloc_mem_bnb

        gpu_total_mem_orig = gpu_peak_mem_orig + gpu_alloc_mem_orig
        gpu_total_mem_bnb = gpu_peak_mem_bnb + gpu_alloc_mem_bnb
        gpu_total_mem_diff = gpu_total_mem_orig - gpu_total_mem_bnb

        # sshleifer/student_marian_en_ro_6_1 has 54M parameter, 29M of which is `nn.Embedding` which
        # doesn't get quantized and remains in fp32. Therefore we only have 25M parameters quantized
        # in 2 bytes and the diff in optim memory usage is derived as so:
        #
        # - normal 25*8=~200MB (8 bytes per param)
        # - bnb    25*2= ~50MB (2 bytes per param)
        #
        # Thus we should expect ~150MB total memory saved.
        #
        # Peak memory should be the same - the total should be different by about that same margin
        #
        # After leaving a small margin to accommodate for differences between gpus let's check
        # that we have at least 120MB in savings
        expected_savings = 120

        # uncomment the following if this test starts failing - requires py38 for a new print feature
        # gpu_peak_mem_diff = gpu_peak_mem_orig - gpu_peak_mem_bnb
        # print(f"{gpu_alloc_mem_orig=}MB {gpu_peak_mem_orig=}MB {gpu_alloc_mem_orig+gpu_peak_mem_orig=}MB")
        # print(f" {gpu_alloc_mem_bnb=}MB  {gpu_peak_mem_bnb=}MB  {gpu_alloc_mem_bnb+gpu_peak_mem_bnb=}MB")
        # print(f"{gpu_alloc_mem_diff=}MB")
        # print(f"{gpu_peak_mem_diff=}MB")
        # print(f"{gpu_total_mem_orig=}MB, {gpu_total_mem_bnb=}MB")
        # print(f"{gpu_total_mem_diff=}MB, {gpu_total_mem_diff=}MB")

        self.assertGreater(
            gpu_alloc_mem_diff,
            expected_savings,
            "should use ~150MB less alloc gpu memory with BNB, compared to without it for this model but got"
            f" a difference of {gpu_alloc_mem_diff}MB, with gpu_alloc_mem_orig={gpu_alloc_mem_orig}MB and"
            f" gpu_alloc_mem_bnb={gpu_alloc_mem_bnb}MB",
        )

        self.assertGreater(
            gpu_total_mem_diff,
            expected_savings,
            "should use ~150MB less total gpu memory with BNB, compared to without it for this model but got"
            f" a difference of {gpu_total_mem_diff}MB, with gpu_total_mem_orig={gpu_total_mem_orig}MB and"
            f" gpu_total_mem_bnb={gpu_total_mem_bnb}MB",
        )

        self.assertEqual(
            loss_orig, loss_bnb, f"loss should be the same, but got loss_orig={loss_orig}, loss_bnb={loss_bnb}"
        )

    def run_trainer(
        self,
        max_len: int,
        model_name: str,
        num_train_epochs: int,
        learning_rate: float = 3e-3,
        optim: str = "adafactor",
        distributed: bool = False,
        extra_args_str: str = None,
        eval_steps: int = 0,
        predict_with_generate: bool = True,
        do_train: bool = True,
        do_eval: bool = True,
        do_predict: bool = True,
        n_gpus_to_use: int = None,
    ):
        data_dir = self.test_file_dir / "../fixtures/tests_samples/wmt_en_ro"
        output_dir = self.get_auto_remove_tmp_dir()
        args_train = f"""
            --model_name_or_path {model_name}
            --train_file {data_dir}/train.json
            --validation_file {data_dir}/val.json
            --test_file {data_dir}/test.json
            --output_dir {output_dir}
            --overwrite_output_dir
            --max_train_samples 8
            --max_source_length {max_len}
            --max_target_length {max_len}
            --do_train
            --num_train_epochs {str(num_train_epochs)}
            --per_device_train_batch_size 4
            --learning_rate {learning_rate}
            --warmup_steps 8
            --logging_steps 0
            --logging_strategy no
            --save_steps {str(eval_steps)}
            --group_by_length
            --label_smoothing_factor 0.1
            --target_lang ro_RO
            --source_lang en_XX
        """.split()

        args_eval = f"""
            --do_eval
            --per_device_eval_batch_size 4
            --max_eval_samples 8
            --val_max_target_length {max_len}
            --evaluation_strategy steps
            --eval_steps {str(eval_steps)}
        """.split()

        args_predict = """
            --do_predict
        """.split()

        args = []
        if do_train:
            args += args_train

        if do_eval:
            args += args_eval

        if do_predict:
            args += args_predict

        if predict_with_generate:
            args += "--predict_with_generate".split()

        if do_train:
            if optim == "adafactor":
                args += "--adafactor".split()
            else:
                args += f"--optim {optim}".split()

        if extra_args_str is not None:
            args += extra_args_str.split()

        if distributed:
            if n_gpus_to_use is None:
                n_gpus_to_use = get_gpu_count()
            master_port = get_torch_dist_unique_port()
            distributed_args = f"""
                -m torch.distributed.run
                --nproc_per_node={n_gpus_to_use}
                --master_port={master_port}
                {self.examples_dir_str}/pytorch/translation/run_translation.py
            """.split()
            cmd = [sys.executable] + distributed_args + args
            # keep for quick debug
            # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
            execute_subprocess_async(cmd, env=self.get_env())
        else:
            testargs = ["run_translation.py"] + args
            with patch.object(sys, "argv", testargs):
                main()

        return output_dir