zjowowen's picture
init space
079c32c
raw
history blame
8.51 kB
import pickle
from abc import abstractmethod, ABCMeta
from collections import deque
from threading import Lock
from typing import TypeVar, Iterable, List, Tuple, Union
from .time_ctl import BaseTime
_Tp = TypeVar('_Tp')
class RangedData(metaclass=ABCMeta):
"""
Overview:
A data structure that can store data for a period of time.
Interfaces:
``__init__``, ``append``, ``extend``, ``current``, ``history``, ``expire``, ``__bool__``, ``_get_time``.
Properties:
- expire (:obj:`float`): The expire time.
"""
def __init__(self, expire: float, use_pickle: bool = False):
"""
Overview:
Initialize the RangedData object.
Arguments:
- expire (:obj:`float`): The expire time of the data.
- use_pickle (:obj:`bool`): Whether to use pickle to serialize the data.
"""
self.__expire = expire
self.__use_pickle = use_pickle
self.__check_expire()
self.__data_max_id = 0
self.__data_items = {}
self.__data_lock = Lock()
self.__last_item = None
self.__queue = deque()
self.__lock = Lock()
def __check_expire(self):
"""
Overview:
Check the expire time.
"""
if isinstance(self.__expire, (int, float)):
if self.__expire <= 0:
raise ValueError(
"Expire should be greater than 0, but {actual} found.".format(actual=repr(self.__expire))
)
else:
raise TypeError(
'Expire should be int or float, but {actual} found.'.format(actual=type(self.__expire).__name__)
)
def __registry_data_item(self, data: _Tp) -> int:
"""
Overview:
Registry the data item.
Arguments:
- data (:obj:`_Tp`): The data item.
"""
with self.__data_lock:
self.__data_max_id += 1
if self.__use_pickle:
self.__data_items[self.__data_max_id] = pickle.dumps(data)
else:
self.__data_items[self.__data_max_id] = data
return self.__data_max_id
def __get_data_item(self, data_id: int) -> _Tp:
"""
Overview:
Get the data item.
Arguments:
- data_id (:obj:`int`): The data id.
"""
with self.__data_lock:
if self.__use_pickle:
return pickle.loads(self.__data_items[data_id])
else:
return self.__data_items[data_id]
def __remove_data_item(self, data_id: int):
"""
Overview:
Remove the data item.
Arguments:
- data_id (:obj:`int`): The data id.
"""
with self.__data_lock:
del self.__data_items[data_id]
def __check_time(self, time_: float):
"""
Overview:
Check the time.
Arguments:
- time_ (:obj:`float`): The time.
"""
if self.__queue:
_time, _ = self.__queue[-1]
if time_ < _time:
raise ValueError(
"Time {time} invalid for descending from last time {last_time}".format(
time=repr(time_), last_time=repr(_time)
)
)
def __append_item(self, time_: float, data: _Tp):
"""
Overview:
Append the data item.
Arguments:
- time_ (:obj:`float`): The time.
- data (:obj:`_Tp`): The data item.
"""
self.__queue.append((time_, self.__registry_data_item(data)))
def __flush_history(self):
"""
Overview:
Flush the history data.
"""
_time = self._get_time()
_limit_time = _time - self.__expire
while self.__queue:
_head_time, _head_id = self.__queue.popleft()
if _head_time >= _limit_time:
self.__queue.appendleft((_head_time, _head_id))
break
else:
if self.__last_item:
_last_time, _last_id = self.__last_item
self.__remove_data_item(_last_id)
self.__last_item = (_head_time, _head_id)
def __append(self, time_: float, data: _Tp):
"""
Overview:
Append the data.
"""
self.__check_time(time_)
self.__append_item(time_, data)
self.__flush_history()
def __current(self):
"""
Overview:
Get the current data.
"""
if self.__queue:
_tail_time, _tail_id = self.__queue.pop()
self.__queue.append((_tail_time, _tail_id))
return self.__get_data_item(_tail_id)
elif self.__last_item:
_last_time, _last_id = self.__last_item
return self.__get_data_item(_last_id)
else:
raise ValueError("This range is empty.")
def __history_yield(self):
"""
Overview:
Yield the history data.
"""
_time = self._get_time()
_limit_time = _time - self.__expire
_latest_time, _latest_id = None, None
if self.__last_item:
_latest_time, _latest_id = _last_time, _last_id = self.__last_item
yield max(_last_time, _limit_time), self.__get_data_item(_last_id)
for _item_time, _item_id in self.__queue:
_latest_time, _latest_id = _item_time, _item_id
yield _item_time, self.__get_data_item(_item_id)
if _latest_time is not None and _latest_time < _time:
yield _time, self.__get_data_item(_latest_id)
def __history(self):
"""
Overview:
Get the history data.
"""
return list(self.__history_yield())
def append(self, data: _Tp):
"""
Overview:
Append the data.
"""
with self.__lock:
self.__flush_history()
_time = self._get_time()
self.__append(_time, data)
return self
def extend(self, iter_: Iterable[_Tp]):
"""
Overview:
Extend the data.
"""
with self.__lock:
self.__flush_history()
_time = self._get_time()
for item in iter_:
self.__append(_time, item)
return self
def current(self) -> _Tp:
"""
Overview:
Get the current data.
"""
with self.__lock:
self.__flush_history()
return self.__current()
def history(self) -> List[Tuple[Union[int, float], _Tp]]:
"""
Overview:
Get the history data.
"""
with self.__lock:
self.__flush_history()
return self.__history()
@property
def expire(self) -> float:
"""
Overview:
Get the expire time.
"""
with self.__lock:
self.__flush_history()
return self.__expire
def __bool__(self):
"""
Overview:
Check whether the range is empty.
"""
with self.__lock:
self.__flush_history()
return not not (self.__queue or self.__last_item)
@abstractmethod
def _get_time(self) -> float:
"""
Overview:
Get the current time.
"""
raise NotImplementedError
class TimeRangedData(RangedData):
"""
Overview:
A data structure that can store data for a period of time.
Interfaces:
``__init__``, ``_get_time``, ``append``, ``extend``, ``current``, ``history``, ``expire``, ``__bool__``.
Properties:
- time (:obj:`BaseTime`): The time.
- expire (:obj:`float`): The expire time.
"""
def __init__(self, time_: BaseTime, expire: float):
"""
Overview:
Initialize the TimeRangedData object.
Arguments:
- time_ (:obj:`BaseTime`): The time.
- expire (:obj:`float`): The expire time.
"""
RangedData.__init__(self, expire)
self.__time = time_
def _get_time(self) -> float:
"""
Overview:
Get the current time.
"""
return self.__time.time()
@property
def time(self):
"""
Overview:
Get the time.
"""
return self.__time