import pika import os # os.environ['CUDA_VISIBLE_DEVICES'] = '-1' import json import time # from get_config import config_params from config import get_config from function import topic_clustering_mnews as tc import requests config_params = get_config() ConfigManager = config_params['ConfigManager'] last_time_check = time.time() def update_result(result, type='daily', meta = {}): command_id = meta["command_id"] output = { "id": command_id, "status": 2, "json_result": json.dumps(result) } url = "https://staging.pontusinc.com/api/news_management/v1/quick_search/update-command-result" # with open("/home/vietle/topic-clustering/config/save_.json", 'w') as f: # json.dump(output, f,ensure_ascii=False) res = requests.post(url, json = output) print(res.text) print('Update result !!!!!!!!!') def callback_func(ch, method, properties, body): print("receive done: ") starttime = time.time() body = json.loads(body.decode("utf-8")) # with open('input_daily.json','w') as f: # json.dump(body, f, ensure_ascii=False) docs = body['docs'] # threshold = body['threshold'] command_id = body.get("command_id",0) print(command_id) threshold = 0.1 top_cluster = body['top_cluster'] top_sentence = body['top_sentence'] topn_summary = body['topn_summary'] hash_str = body['hash_str'] st_time = body['st_time'] meta = body.get('meta',{}) meta["command_id"] = command_id is_cache = False try: with open("log_mnews/log/log.txt") as f: data_dict = json.load(f) except Exception as ve: print(ve) data_dict = {} try: if hash_str in data_dict: path_res = data_dict[hash_str]["response_path"] with open(path_res) as ff: results = json.load(ff) print("time analysis (cache): ", time.time() - st_time) update_result(results,meta=meta) is_cache = True except Exception as vee: print(vee) if not is_cache: results = tc.topic_clustering(docs, threshold, top_cluster=top_cluster, top_sentence=top_sentence, topn_summary=topn_summary) update_result(results, meta=meta) path_res = "log_mnews/result_{0}.txt".format(hash_str) with open(path_res, "w+") as ff: ff.write(json.dumps(results)) data_dict[hash_str] = {"time": st_time, "response_path": path_res} lst_rm = [] global last_time_check if time.time() - last_time_check > 3600: print("check log to del .....") last_time_check = time.time() for dt in data_dict: if time.time() - data_dict[dt]["time"] > 30 * 24 * 3600: lst_rm.append(dt) for dt in lst_rm: del data_dict[dt] with open("log_mnews/log/log.txt", "w+") as ff: ff.write(json.dumps(data_dict)) print("time analysis: ", time.time() - starttime) ch.basic_ack(delivery_tag=method.delivery_tag) def test(): with open('req_daily/aus.json') as f: body = json.load(f) docs = body['response']['docs'] # threshold = body['threshold'] threshold = 0.25 top_cluster = body['top_cluster'] top_sentence = body['top_sentence'] topn_summary = body['topn_summary'] # hash_str = body['hash_str'] # st_time = body['st_time'] meta = body['response'].get('meta',{}) results = tc.topic_clustering(docs, threshold, top_cluster=top_cluster, top_sentence=top_sentence, topn_summary=topn_summary) update_result(results, meta=meta) print(123) if __name__ == '__main__': params = ConfigManager['QueueConfigs']['queue_topic_clustering_mnews'] usr_name = params["UserName"] password = str(params["Password"]) host = params["HostName"] virtual_host = params["VirtualHost"] queue_name = params["Queue"] while True: try: credentials = pika.PlainCredentials(usr_name, password) connection = pika.BlockingConnection( pika.ConnectionParameters(host=host, virtual_host=virtual_host, credentials=credentials, heartbeat=3600, blocked_connection_timeout=3600)) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True, arguments={"x-max-priority": 10}) print(" * wait message") channel.basic_qos(prefetch_count=1) channel.basic_consume(queue=queue_name, on_message_callback=callback_func) channel.start_consuming() except Exception as ex: print(f'[ERROR] ', ex) # raise ex