File size: 4,979 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
import sys
import os
import time
from ditk import logging
import argparse
import tempfile
from random import random
from string import ascii_lowercase
from ding.framework import Parallel
alphabet = [c.encode('ascii') for c in ascii_lowercase]
class EasyCounter:
def __init__(self):
self._last = None
self._cnt = 0
def add(self, item):
self._last = item
self._cnt += 1
def cnt(self):
return self._cnt
def last(self):
return self._last
class SockTest:
# In this class, we define three processes except the main process,
# which are receiver, testee, and sender.
# The testee receive messages from the sender, and sends its own greeting
# messages to the receiver periodically.
# During the test, we breakdown the network of testee, and then find out
# what happens to the testee.
@classmethod
def receiver(cls, epoch, interval):
router = Parallel()
greets = EasyCounter()
router.on("greeting_receiver", lambda msg: greets.add(msg))
start_t = time.time()
logging.info("receiver start ...")
for i in range(epoch):
while time.time() - start_t < i * interval:
time.sleep(0.01)
if greets.cnt() == 0 or i % 10 != 0:
continue
last_msg = greets.last()
msg_idx, msg_t = last_msg.split("_")[-2:]
logging.info(
"receiver passed {:.2f} s, received {} msgs. last msg: idx {}, time {} s".format(
time.time() - start_t, greets.cnt(), msg_idx, msg_t
)
)
logging.info("receiver done! total msg: {}".format(greets.cnt()))
@classmethod
def testee(cls, epoch, interval, data_size):
words = b''.join([alphabet[int(random() * 26)] for _ in range(1024 * 1024)]) * data_size
print("msg length: {:.4f} MB".format(sys.getsizeof(words) / 1024 / 1024))
router = Parallel()
greets = EasyCounter()
router.on("greeting_testee", lambda msg: greets.add(msg))
start_t = time.time()
logging.info("testee start ...")
with tempfile.NamedTemporaryFile(prefix="pytmp_", dir="./") as itf:
print("testee: write ip address to the tempfile:", itf.name)
with open(itf.name, 'w') as ifd:
ifd.write("{}\n".format(router.get_ip()))
for i in range(epoch):
while time.time() - start_t < i * interval:
time.sleep(0.01)
if router._retries == 0:
router.emit("greeting_receiver", "{}_{}_{:.2f}".format(words, i, time.time() - start_t))
elif router._retries == 1:
router.emit("greeting_receiver", "recovered_{}_{:.2f}".format(i, time.time() - start_t))
else:
raise Exception("Failed too many times")
if greets.cnt() == 0 or i % 10 != 0:
continue
last_msg = greets.last()
msg_idx, msg_t = last_msg.split("_")[-2:]
logging.info(
"testee passed {:.2f} s, received {} msgs. last msg: idx {}, time {} s".format(
time.time() - start_t, greets.cnt(), msg_idx, msg_t
)
)
logging.info("testee done! total msg: {} retries: {}".format(greets.cnt(), router._retries))
@classmethod
def sender(cls, epoch, interval, data_size):
words = b''.join([alphabet[int(random() * 26)] for _ in range(1024 * 1024)]) * data_size
print("msg length: {:.4f} MB".format(sys.getsizeof(words) / 1024 / 1024))
router = Parallel()
start_t = time.time()
logging.info("sender start ...")
for i in range(epoch):
while time.time() - start_t < i * interval:
time.sleep(0.01)
router.emit("greeting_testee", "{}_{}_{:.2f}".format(words, i, time.time() - start_t))
logging.info("sender done!")
@classmethod
def main(cls, epoch=1000, interval=1.0, data_size=1, file="tmp_p1"):
router = Parallel()
if router.node_id == 0:
cls.receiver(epoch, interval)
elif router.node_id == 1:
cls.testee(epoch, interval, data_size)
elif router.node_id == 2:
cls.sender(epoch, interval, data_size)
else:
raise Exception("Invalid node id")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--epoch', '-t', type=int, default=1200)
parser.add_argument('--interval', '-i', type=float, default=0.1)
parser.add_argument('--data_size', '-s', type=int, default=1)
args = parser.parse_args()
Parallel.runner(
n_parallel_workers=3, protocol="tcp", topology="mesh", auto_recover=True, max_retries=1
)(SockTest.main, args.epoch, args.interval, args.data_size)
|