File size: 1,252 Bytes
079c32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from time import sleep
import pytest
from ding.framework import EventLoop
from threading import Lock


@pytest.mark.unittest
def test_event_loop():
    loop = EventLoop.get_event_loop("test")
    try:
        counter = 0
        lock = Lock()

        def callback(n, lock):
            nonlocal counter
            with lock:
                counter += n

        # Test on
        loop.on("count", callback)

        for i in range(5):
            loop.emit("count", i, lock)
        sleep(0.1)
        assert counter == 10

        # Test off
        loop.off("count")
        loop.emit("count", 10, lock)
        sleep(0.1)
        assert counter == 10

        # Test once
        counter = 0
        loop.once("count", callback)
        loop.once("count", callback)
        loop.emit("count", 10, lock)
        sleep(0.1)
        assert counter == 20
        loop.emit("count", 10, lock)
        assert counter == 20

        # Test exception
        def except_callback():
            raise Exception("error")

        loop.on("error", except_callback)
        loop.emit("error")
        sleep(0.1)
        assert loop._exception is not None
        with pytest.raises(Exception):
            loop.emit("error")
    finally:
        loop.stop()