File size: 4,979 Bytes
079c32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import sys
import os
import time
from ditk import logging
import argparse
import tempfile

from random import random
from string import ascii_lowercase
from ding.framework import Parallel

alphabet = [c.encode('ascii') for c in ascii_lowercase]


class EasyCounter:

    def __init__(self):
        self._last = None
        self._cnt = 0

    def add(self, item):
        self._last = item
        self._cnt += 1

    def cnt(self):
        return self._cnt

    def last(self):
        return self._last


class SockTest:

    # In this class, we define three processes except the main process,
    # which are receiver, testee, and sender.
    # The testee receive messages from the sender, and sends its own greeting
    # messages to the receiver periodically.
    # During the test, we breakdown the network of testee, and then find out
    # what happens to the testee.

    @classmethod
    def receiver(cls, epoch, interval):
        router = Parallel()
        greets = EasyCounter()
        router.on("greeting_receiver", lambda msg: greets.add(msg))
        start_t = time.time()
        logging.info("receiver start ...")

        for i in range(epoch):
            while time.time() - start_t < i * interval:
                time.sleep(0.01)

            if greets.cnt() == 0 or i % 10 != 0:
                continue
            last_msg = greets.last()
            msg_idx, msg_t = last_msg.split("_")[-2:]
            logging.info(
                "receiver passed {:.2f} s, received {} msgs. last msg: idx {}, time {} s".format(
                    time.time() - start_t, greets.cnt(), msg_idx, msg_t
                )
            )

        logging.info("receiver done! total msg: {}".format(greets.cnt()))

    @classmethod
    def testee(cls, epoch, interval, data_size):
        words = b''.join([alphabet[int(random() * 26)] for _ in range(1024 * 1024)]) * data_size
        print("msg length: {:.4f} MB".format(sys.getsizeof(words) / 1024 / 1024))

        router = Parallel()
        greets = EasyCounter()
        router.on("greeting_testee", lambda msg: greets.add(msg))
        start_t = time.time()
        logging.info("testee start ...")

        with tempfile.NamedTemporaryFile(prefix="pytmp_", dir="./") as itf:
            print("testee: write ip address to the tempfile:", itf.name)
            with open(itf.name, 'w') as ifd:
                ifd.write("{}\n".format(router.get_ip()))

            for i in range(epoch):
                while time.time() - start_t < i * interval:
                    time.sleep(0.01)

                if router._retries == 0:
                    router.emit("greeting_receiver", "{}_{}_{:.2f}".format(words, i, time.time() - start_t))
                elif router._retries == 1:
                    router.emit("greeting_receiver", "recovered_{}_{:.2f}".format(i, time.time() - start_t))
                else:
                    raise Exception("Failed too many times")

                if greets.cnt() == 0 or i % 10 != 0:
                    continue
                last_msg = greets.last()
                msg_idx, msg_t = last_msg.split("_")[-2:]
                logging.info(
                    "testee passed {:.2f} s, received {} msgs. last msg: idx {}, time {} s".format(
                        time.time() - start_t, greets.cnt(), msg_idx, msg_t
                    )
                )

        logging.info("testee done! total msg: {} retries: {}".format(greets.cnt(), router._retries))

    @classmethod
    def sender(cls, epoch, interval, data_size):
        words = b''.join([alphabet[int(random() * 26)] for _ in range(1024 * 1024)]) * data_size
        print("msg length: {:.4f} MB".format(sys.getsizeof(words) / 1024 / 1024))

        router = Parallel()
        start_t = time.time()
        logging.info("sender start ...")

        for i in range(epoch):
            while time.time() - start_t < i * interval:
                time.sleep(0.01)

            router.emit("greeting_testee", "{}_{}_{:.2f}".format(words, i, time.time() - start_t))

        logging.info("sender done!")

    @classmethod
    def main(cls, epoch=1000, interval=1.0, data_size=1, file="tmp_p1"):
        router = Parallel()
        if router.node_id == 0:
            cls.receiver(epoch, interval)
        elif router.node_id == 1:
            cls.testee(epoch, interval, data_size)
        elif router.node_id == 2:
            cls.sender(epoch, interval, data_size)
        else:
            raise Exception("Invalid node id")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--epoch', '-t', type=int, default=1200)
    parser.add_argument('--interval', '-i', type=float, default=0.1)
    parser.add_argument('--data_size', '-s', type=int, default=1)
    args = parser.parse_args()
    Parallel.runner(
        n_parallel_workers=3, protocol="tcp", topology="mesh", auto_recover=True, max_retries=1
    )(SockTest.main, args.epoch, args.interval, args.data_size)