File size: 4,738 Bytes
079c32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from __future__ import print_function
from dizoo.beergame.envs import clBeerGame
from torch import Tensor
import numpy as np
import random
from .utils import get_config, update_config
import gym
import os
from typing import Optional


class BeerGame():

    def __init__(self, role: int, agent_type: str, demandDistribution: int) -> None:
        self._cfg, unparsed = get_config()
        self._role = role
        # prepare loggers and directories
        # prepare_dirs_and_logger(self._cfg)
        self._cfg = update_config(self._cfg)

        # set agent type
        if agent_type == 'bs':
            self._cfg.agentTypes = ["bs", "bs", "bs", "bs"]
        elif agent_type == 'Strm':
            self._cfg.agentTypes = ["Strm", "Strm", "Strm", "Strm"]
        self._cfg.agentTypes[role] = "srdqn"

        self._cfg.demandDistribution = demandDistribution

        # load demands:0=uniform, 1=normal distribution, 2=the sequence of 4,4,4,4,8,..., 3= basket data, 4= forecast data
        if self._cfg.observation_data:
            adsr = 'data/demandTr-obs-'
        elif self._cfg.demandDistribution == 3:
            if self._cfg.scaled:
                adsr = 'data/basket_data/scaled'
            else:
                adsr = 'data/basket_data'
            direc = os.path.realpath(adsr + '/demandTr-' + str(self._cfg.data_id) + '.npy')
            self._demandTr = np.load(direc)
            print("loaded training set=", direc)
        elif self._cfg.demandDistribution == 4:
            if self._cfg.scaled:
                adsr = 'data/forecast_data/scaled'
            else:
                adsr = 'data/forecast_data'
            direc = os.path.realpath(adsr + '/demandTr-' + str(self._cfg.data_id) + '.npy')
            self._demandTr = np.load(direc)
            print("loaded training set=", direc)
        else:
            if self._cfg.demandDistribution == 0:  # uniform
                self._demandTr = np.random.randint(0, self._cfg.demandUp, size=[self._cfg.demandSize, self._cfg.TUp])
            elif self._cfg.demandDistribution == 1:  # normal distribution
                self._demandTr = np.round(
                    np.random.normal(
                        self._cfg.demandMu, self._cfg.demandSigma, size=[self._cfg.demandSize, self._cfg.TUp]
                    )
                ).astype(int)
            elif self._cfg.demandDistribution == 2:  # the sequence of 4,4,4,4,8,...
                self._demandTr = np.concatenate(
                    (4 * np.ones((self._cfg.demandSize, 4)), 8 * np.ones((self._cfg.demandSize, 98))), axis=1
                ).astype(int)

        # initilize an instance of Beergame
        self._env = clBeerGame(self._cfg)
        self.observation_space = gym.spaces.Box(
            low=float("-inf"),
            high=float("inf"),
            shape=(self._cfg.stateDim * self._cfg.multPerdInpt, ),
            dtype=np.float32
        )  # state_space = state_dim * m (considering the reward delay)
        self.action_space = gym.spaces.Discrete(self._cfg.actionListLen)  # length of action list
        self.reward_space = gym.spaces.Box(low=float("-inf"), high=float("inf"), shape=(1, ), dtype=np.float32)

        # get the length of the demand.
        self._demand_len = np.shape(self._demandTr)[0]

    def reset(self):
        self._env.resetGame(demand=self._demandTr[random.randint(0, self._demand_len - 1)])
        obs = [i for item in self._env.players[self._role].currentState for i in item]
        return obs

    def seed(self, seed: int) -> None:
        self._seed = seed
        np.random.seed(self._seed)

    def close(self) -> None:
        pass

    def step(self, action: np.ndarray):
        self._env.handelAction(action)
        self._env.next()
        newstate = np.append(
            self._env.players[self._role].currentState[1:, :], [self._env.players[self._role].nextObservation], axis=0
        )
        self._env.players[self._role].currentState = newstate
        obs = [i for item in newstate for i in item]
        rew = self._env.players[self._role].curReward
        done = (self._env.curTime == self._env.T)
        info = {}
        return obs, rew, done, info

    def reward_shaping(self, reward: Tensor) -> Tensor:
        self._totRew, self._cumReward = self._env.distTotReward(self._role)
        reward += (self._cfg.distCoeff / 3) * ((self._totRew - self._cumReward) / (self._env.T))
        return reward

    def enable_save_figure(self, figure_path: Optional[str] = None) -> None:
        self._cfg.ifSaveFigure = True
        if figure_path is None:
            figure_path = './'
        self._cfg.figure_dir = figure_path
        self._env.doTestMid(self._demandTr[random.randint(0, self._demand_len - 1)])