import os import pymongo import datetime import time from .cloudService import GetConfig local_path = os.path.dirname(__file__) class DBUtils(GetConfig): """ 从安全的角度出发,将一些默认配置文件上传至COS中,接下来使用COS和它的子类的时候,在第一次使用时需要输入Cuny给的id和key 用于连接数据库等对象 当然,在db_default_download = False的时候,如果在运行路径下已经有配置文件了, 那么就不用再次下载了,也不用输入id和key 事实上这只需要运行一次,因为配置文件将会被下载至源码文件夹中 如果要自定义路径,请在继承的子类中编写__init__函数,将service_path定向到指定路径 """ __BASE_DIR: dict = None __PARAMS_DIR: dict = None db_base_path: str = f"{local_path}/conf/base_config.json" db_params_path: str = f"{local_path}/conf/params.json" db_default_download: bool = False @property def base_config(self): if self.__BASE_DIR is None: self.__BASE_DIR = self.load_json(self.db_base_path, self.db_default_download) return self.__BASE_DIR @property def db_config(self): return self.base_config["database_config"] @property def params_config(self): if self.__PARAMS_DIR is None: self.__PARAMS_DIR = self.load_json(self.db_params_path, self.db_default_download) return self.__PARAMS_DIR @property def size_dir(self): return self.params_config["size_config"] @property def func_dir(self): return self.params_config["func_config"] @property def wx_config(self): return self.base_config["wx_config"] def get_dbClient(self): return pymongo.MongoClient(self.db_config["connect_url"]) @staticmethod def get_time(yyyymmdd=None, delta_date=0): """ 给出当前的时间 :param yyyymmdd: 以yyyymmdd给出的日期时间 :param delta_date: 获取减去delta_day后的时间,默认为0就是当天 时间格式:yyyy_mm_dd """ if yyyymmdd is None: now_time = (datetime.datetime.now() - datetime.timedelta(delta_date)).strftime("%Y-%m-%d") return now_time # 输入了yyyymmdd的数据和delta_date,通过这两个数据返回距离yyyymmdd delta_date天的时间 pre_time = datetime.datetime(int(yyyymmdd[0:4]), int(yyyymmdd[4:6]), int(yyyymmdd[6:8])) return (pre_time - datetime.timedelta(delta_date)).strftime("%Y-%m-%d") # 获得时间戳 def get_timestamp(self, date_time:str=None) -> int: """ 输入的日期形式为:"2021-11-29 16:39:45.999" 真正必须输入的是前十个字符,及精确到日期,后面的时间可以不输入,不输入则默认置零 """ def standardDateTime(dt:str) -> str: """ 规范化时间字符串 """ if len(dt) < 10: raise ValueError("你必须至少输入准确到天的日期!比如:2021-11-29") elif len(dt) == 10: return dt + " 00:00:00.0" else: try: date, time = dt.split(" ") except ValueError: raise ValueError("你只能也必须在日期与具体时间之间增加一个空格,其他地方不能出现空格!") while len(time) < 10: if len(time) in (2, 5): time += ":" elif len(time) == 8: time += "." else: time += "0" return date + " " + time if date_time is None: # 默认返回当前时间(str), date_time精确到毫秒 date_time = datetime.datetime.now() # 转换成时间戳 else: date_time = standardDateTime(dt=date_time) date_time = datetime.datetime.strptime(date_time, "%Y-%m-%d %H:%M:%S.%f") timestamp_ms = int(time.mktime(date_time.timetuple()) * 1000.0 + date_time.microsecond / 1000.0) return timestamp_ms @staticmethod def get_standardTime(yyyy_mm_dd: str): return yyyy_mm_dd[0:4] + yyyy_mm_dd[5:7] + yyyy_mm_dd[8:10] def find_oneDay_data(self, db_name: str, collection_name: str, date: str = None) -> dict: """ 获取指定天数的数据,如果date is None,就自动寻找距今最近的有数据的那一天的数据 """ df = None # 应该被返回的数据 collection = self.get_dbClient()[db_name][collection_name] if date is None: # 自动寻找前几天的数据,最多三十天 for delta_date in range(1, 31): date_yyyymmdd = self.get_standardTime(self.get_time(delta_date=delta_date)) filter_ = {"date": date_yyyymmdd} df = collection.find_one(filter=filter_) if df is not None: del df["_id"] break else: filter_ = {"date": date} df = collection.find_one(filter=filter_) if df is not None: del df["_id"] return df def find_daysData_byPeriod(self, date_period: tuple, db_name: str, col_name: str): # 给出一个指定的范围日期,返回相应的数据(日期的两头都会被寻找) # 这个函数我们默认数据库中的数据是连续的,即不会出现在 20211221 到 20211229 之间有一天没有数据的情况 if len(date_period) != 2: raise ValueError("date_period数据结构:(开始日期,截止日期)") start, end = date_period # yyyymmdd delta_date = int(end) - int(start) if delta_date < 0: raise ValueError("传入的日期有误!") collection = self.get_dbClient()[db_name][col_name] date = start while int(date) <= int(end): yield collection.find_one(filter={"date": date}) date = self.get_standardTime(self.get_time(date, -1)) @staticmethod def find_biggest_valueDict(dict_: dict): # 寻找字典中数值最大的字段,要求输入的字典的字段值全为数字 while len(dict_) > 0: max_value = 0 p = None for key in dict_: if dict_[key] > max_value: p = key max_value = dict_[key] yield p, max_value del dict_[p] def copy_andAdd_dict(self, dict_base, dict_): # 深度拷贝字典,将后者赋值给前者 # 如果后者的键名在前者已经存在,则直接相加。这就要求两者的数据是数值型 for key in dict_: if key not in dict_base: dict_base[key] = dict_[key] else: if isinstance(dict_[key], int) or isinstance(dict_[key], float): dict_base[key] = round(dict_[key] + dict_base[key], 2) else: dict_base[key] = self.copy_andAdd_dict(dict_base[key], dict_[key]) return dict_base @staticmethod def compare_data(dict1: dict, dict2: dict, suffix: str, save: int, **kwargs): """ 有两个字典,并且通过kwargs会传输一个新的字典,根据字典中的键值我们进行比对,处理成相应的数据格式 并且在dict1中,生成一个新的键值,为kwargs中的元素+suffix save:保留几位小数 """ new_dict = dict1.copy() for key in kwargs: try: if kwargs[key] not in dict2 or int(dict2[kwargs[key]]) == -1 or float(dict1[kwargs[key]]) <= 0.0: # 数据不存在 data_new = 5002 else: try: data_new = round( ((float(dict1[kwargs[key]]) - float(dict2[kwargs[key]])) / float(dict2[kwargs[key]])) * 100 , save) except ZeroDivisionError: data_new = 5002 if data_new == 0.0: data_new = 0 except TypeError as e: print(e) data_new = 5002 # 如果没有之前的数据,默认返回0 new_dict[kwargs[key] + suffix] = data_new return new_dict @staticmethod def sum_dictList_byKey(dictList: list, **kwargs) -> dict: """ 有一个列表,列表中的元素为字典,并且所有字典都有一个键值为key的字段,字段值为数字 我们将每一个字典的key字段提取后相加,得到该字段值之和. """ sum_num = {} if kwargs is None: raise ImportError("Please input at least ONE key") for key in kwargs: sum_num[kwargs[key]] = 0 for dict_ in dictList: if not isinstance(dict_, dict): raise TypeError("object is not DICT!") for key in kwargs: sum_num[kwargs[key]] += dict_[kwargs[key]] return sum_num @staticmethod def sum_2ListDict(list_dict1: list, list_dict2: list, key_name, data_name): """ 有两个列表,列表内的元素为字典,我们根据key所对应的键值寻找列表中键值相同的两个元素,将他们的data对应的键值相加 生成新的列表字典(其余键值被删除) key仅在一个列表中存在,则直接加入新的列表字典 """ sum_list = [] def find_sameKey(kn, key_, ld: list) -> int: for dic_ in ld: if dic_[kn] == key_: post_ = ld.index(dic_) return post_ return -1 for dic in list_dict1: key = dic[key_name] # 键名 post = find_sameKey(key_name, key, list_dict2) # 在list2中寻找相同的位置 data = dic[data_name] + list_dict2[post][data_name] if post != -1 else dic[data_name] sum_list.append({key_name: key, data_name: data}) return sum_list @staticmethod def find_biggest_dictList(dictList: list, key: str = "key", data: str = "value"): """ 有一个列表,里面每一个元素都是一个字典 这些字典有一些共通性质,那就是里面都有一个key键名和一个data键名,后者的键值必须是数字 我们根据data键值的大小进行生成,每一次返回列表中data键值最大的数和它的key键值 """ while len(dictList) > 0: point = 0 biggest_num = int(dictList[0][data]) biggest_key = dictList[0][key] for i in range(len(dictList)): num = int(dictList[i][data]) if num > biggest_num: point = i biggest_num = int(dictList[i][data]) biggest_key = dictList[i][key] yield str(biggest_key), biggest_num del dictList[point] def get_share_data(self, date_yyyymmdd: str): # 获得用户界面情况 visitPage = self.find_oneDay_data(date=date_yyyymmdd, db_name="cuny-user-analysis", collection_name="daily-userVisitPage") if visitPage is not None: # 这一部分没有得到数据是可以容忍的.不用抛出模态框错误 # 获得昨日用户分享情况 sum_num = self.sum_dictList_byKey(dictList=visitPage["data_list"], key1="page_share_pv", key2="page_share_uv") else: # 此时将分享次数等置为-1 sum_num = {"page_share_pv": -1, "page_share_uv": -1} return sum_num @staticmethod def compare_date(date1_yyyymmdd: str, date2_yyyymmdd: str): # 如果date1是date2的昨天,那么就返回True date1 = int(date1_yyyymmdd) date2 = int(date2_yyyymmdd) return True if date2 - date1 == 1 else False def change_time(self, date_yyyymmdd: str, mode: int): # 将yyyymmdd的数据分开为相应的数据形式 if mode == 1: if self.compare_date(date_yyyymmdd, self.get_standardTime(self.get_time(delta_date=0))) is False: return date_yyyymmdd[0:4] + "年" + date_yyyymmdd[4:6] + "月" + date_yyyymmdd[6:8] + "日" else: return "昨日" elif mode == 2: date = date_yyyymmdd[0:4] + "." + date_yyyymmdd[4:6] + "." + date_yyyymmdd[6:8] if self.compare_date(date_yyyymmdd, self.get_standardTime(self.get_time(delta_date=0))) is True: return date + "~" + date + " | 昨日" else: return date + "~" + date @staticmethod def changeList_dict2List_list(dl: list, order: list): """ 列表内是一个个字典,本函数将字典拆解,以order的形式排列键值为列表 考虑到一些格式的问题,这里我采用生成器的形式封装 """ for dic in dl: # dic是列表内的字典元素 tmp = [] for key_name in order: key = dic[key_name] tmp.append(key) yield tmp def dict_mapping(self, dict_name: str, id_: str): """ 进行字典映射,输入字典名称和键名,返回具体的键值 如果不存在,则原路返回键名 """ try: return getattr(self, dict_name)[id_] except KeyError: return id_ except AttributeError: print(f"[WARNING]: 本对象内部不存在{dict_name}!") return id_ @staticmethod def dictAddKey(dic: dict, dic_tmp: dict, **kwargs): """ 往字典中加入参数,可迭代 """ for key in kwargs: dic[key] = dic_tmp[key] return dic if __name__ == "__main__": dbu = DBUtils()