File size: 5,308 Bytes
34fb220
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import utils
from torchvision.transforms import Resize
from collections import OrderedDict
import numpy as np
import matplotlib.cm as cm
import matplotlib as mpl
from torchvision.transforms import InterpolationMode


from .abs_model import abs_model
from .blocks import *
from .SSN import SSN
from .SSN_v1 import SSN_v1
from .Loss.Loss import norm_loss, grad_loss
from .Attention_Unet import Attention_Unet 

class Sparse_PH(abs_model):
    def __init__(self, opt):
        mid_act      = opt['model']['mid_act']
        out_act      = opt['model']['out_act']
        in_channels  = opt['model']['in_channels']
        out_channels = opt['model']['out_channels']
        resnet       = opt['model']['resnet']
        backbone     = opt['model']['backbone']

        self.ncols   = opt['hyper_params']['n_cols']
        self.focal   = opt['model']['focal']
        self.clip    = opt['model']['clip']

        self.norm_loss_  = opt['model']['norm_loss']
        self.grad_loss_  = opt['model']['grad_loss']
        self.ggrad_loss_ = opt['model']['ggrad_loss']
        self.lap_loss    = opt['model']['lap_loss']

        self.clip_range = opt['dataset']['linear_scale'] + opt['dataset']['linear_offset']

        if backbone == 'Default':
            self.model = SSN_v1(in_channels=in_channels,
                                out_channels=out_channels,
                                mid_act=mid_act,
                                out_act=out_act,
                                resnet=resnet)
        elif backbone == 'ATTN':
            self.model = Attention_Unet(in_channels, out_channels, mid_act=mid_act, out_act=out_act)

        self.optimizer = get_optimizer(opt, self.model)
        self.visualization = {}

        self.norm_loss = norm_loss()
        self.grad_loss = grad_loss()


    def setup_input(self, x):
        return x


    def forward(self, x):
        return self.model(x)


    def compute_loss(self, y, pred):
        b = y.shape[0]

        # total_loss = avg_norm_loss(y, pred)
        nloss   = self.norm_loss.loss(y, pred) * self.norm_loss_
        gloss   = self.grad_loss.loss(pred) * self.grad_loss_
        ggloss  = self.grad_loss.gloss(y, pred) * self.ggrad_loss_
        laploss = self.grad_loss.laploss(pred) * self.lap_loss

        total_loss = nloss + gloss + ggloss + laploss

        self.loss_log = {
            'norm_loss': nloss.item(),
            'grad_loss': gloss.item(),
            'grad_l1_loss': ggloss.item(),
            'lap_loss': laploss.item(),
        }


        if self.focal:
            total_loss = torch.pow(total_loss, 3)

        return total_loss


    def supervise(self, input_x, y, is_training:bool)->float:
        optimizer = self.optimizer
        model = self.model

        x = input_x['x']

        optimizer.zero_grad()
        pred = self.forward(x)
        if self.clip:
            pred = torch.clip(pred, 0.0, self.clip_range)

        loss = self.compute_loss(y, pred)
        if is_training:
            loss.backward()
            optimizer.step()

        xc = x.shape[1]
        for i in range(xc):
            self.visualization['x{}'.format(i)] = x[:, i:i+1].detach()

        self.visualization['y_fore']    = y[:, 0:1].detach()
        self.visualization['y_back']    = y[:, 1:2].detach()
        self.visualization['pred_fore'] = pred[:, 0:1].detach()
        self.visualization['pred_back'] = pred[:, 1:2].detach()

        return loss.item()


    def get_visualize(self) -> OrderedDict:
        """ Convert to visualization numpy array
        """
        nrows          = self.ncols
        visualizations = self.visualization
        ret_vis        = OrderedDict()

        for k, v in visualizations.items():
            batch = v.shape[0]
            n     = min(nrows, batch)

            plot_v = v[:n]
            ret_vis[k] = np.clip(utils.make_grid(plot_v.cpu(), nrow=nrows).numpy().transpose(1,2,0), 0.0, 1.0)
            ret_vis[k] = self.plasma(ret_vis[k])

        return ret_vis


    def get_logs(self):
        return self.loss_log


    def inference(self, x):
        x, device = x['x'], x['device']
        x = torch.from_numpy(x.transpose((2,0,1))).unsqueeze(dim=0).float().to(device)
        pred = self.forward(x)

        pred = pred[0].detach().cpu().numpy().transpose((1,2,0))

        return pred


    def batch_inference(self, x):
        x = x['x']
        pred = self.forward(x)
        return pred


    """ Getter & Setter
    """
    def get_models(self) -> dict:
        return {'model': self.model}


    def get_optimizers(self) -> dict:
        return {'optimizer': self.optimizer}


    def set_models(self, models: dict) :
        # input test
        if 'model' not in models.keys():
            raise ValueError('{} not in self.model'.format('model'))

        self.model = models['model']


    def set_optimizers(self, optimizer: dict):
        self.optimizer = optimizer['optimizer']


    ####################
    # Personal Methods #
    ####################
    def plasma(self, x):
        norm   = mpl.colors.Normalize(vmin=0.0, vmax=1)
        mapper = cm.ScalarMappable(norm=norm, cmap='plasma')
        bimg   = mapper.to_rgba(x[:,:,0])[:,:,:3]

        return bimg