File size: 2,994 Bytes
079c32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import pytest
from itertools import product
import numpy as np
import torch

from ding.rl_utils import happo_data, happo_error, happo_error_continuous
from ding.rl_utils.ppo import shape_fn_ppo

use_value_clip_args = [True, False]
dual_clip_args = [None, 5.0]
random_weight = torch.rand(4) + 1
weight_args = [None, random_weight]
factor_args = [torch.rand(4, 1)]
args = [item for item in product(*[use_value_clip_args, dual_clip_args, weight_args, factor_args])]


@pytest.mark.unittest
def test_shape_fn_ppo():
    data = happo_data(torch.randn(3, 5, 8), None, None, None, None, None, None, None, None)
    shape1 = shape_fn_ppo([data], {})
    shape2 = shape_fn_ppo([], {'data': data})
    assert shape1 == shape2 == (3, 5, 8)


@pytest.mark.unittest
@pytest.mark.parametrize('use_value_clip, dual_clip, weight, factor', args)
def test_happo(use_value_clip, dual_clip, weight, factor):
    B, N = 4, 32
    logit_new = torch.randn(B, N).requires_grad_(True)
    logit_old = logit_new + torch.rand_like(logit_new) * 0.1
    action = torch.randint(0, N, size=(B, ))
    value_new = torch.randn(B).requires_grad_(True)
    value_old = value_new + torch.rand_like(value_new) * 0.1
    adv = torch.rand(B)
    return_ = torch.randn(B) * 2
    data = happo_data(logit_new, logit_old, action, value_new, value_old, adv, return_, weight, factor)
    loss, info = happo_error(data, use_value_clip=use_value_clip, dual_clip=dual_clip)
    assert all([l.shape == tuple() for l in loss])
    assert all([np.isscalar(i) for i in info])
    assert logit_new.grad is None
    assert value_new.grad is None
    total_loss = sum(loss)
    total_loss.backward()
    assert isinstance(logit_new.grad, torch.Tensor)
    assert isinstance(value_new.grad, torch.Tensor)


@pytest.mark.unittest
@pytest.mark.parametrize('use_value_clip, dual_clip, weight, factor', args)
def test_happo_error_continous(use_value_clip, dual_clip, weight, factor):
    B, N = 4, 6
    mu_sigma_new = {'mu': torch.rand(B, N).requires_grad_(True), 'sigma': torch.rand(B, N).requires_grad_(True)}
    mu_sigma_old = {
        'mu': mu_sigma_new['mu'] + torch.rand_like(mu_sigma_new['mu']) * 0.1,
        'sigma': mu_sigma_new['sigma'] + torch.rand_like(mu_sigma_new['sigma']) * 0.1
    }
    action = torch.rand(B, N)
    value_new = torch.randn(B).requires_grad_(True)
    value_old = value_new + torch.rand_like(value_new) * 0.1
    adv = torch.rand(B)
    return_ = torch.randn(B) * 2
    data = happo_data(mu_sigma_new, mu_sigma_old, action, value_new, value_old, adv, return_, weight, factor)
    loss, info = happo_error_continuous(data, use_value_clip=use_value_clip, dual_clip=dual_clip)
    assert all([l.shape == tuple() for l in loss])
    assert all([np.isscalar(i) for i in info])
    assert mu_sigma_new['mu'].grad is None
    assert value_new.grad is None
    total_loss = sum(loss)
    total_loss.backward()
    assert isinstance(mu_sigma_new['mu'].grad, torch.Tensor)
    assert isinstance(value_new.grad, torch.Tensor)