import time import os import copy import concurrent.futures from pkgs import LangChainExecutor class QuotaManager(LangChainExecutor): def __init__(self, model_name, api_keys): """ Khởi tạo QuotaManager với danh sách các API key. :param model_name: Tên của mô hình LLM. #gpt-... pr gemini-... :param api_keys: Danh sách các API key. """ self.api_keys = api_keys self.current_key_index = 0 self.last_used_time = time.time() super().__init__(model_name) self.update_api_key() def update_api_key(self): """ Cập nhật API key hiện tại từ danh sách. """ self.api_key = self.api_keys[self.current_key_index] os.environ["GEMINI_API_KEY"] = self.api_key print(f"Đang sử dụng API key: {self.api_key}") def rotate_api_key(self): """ Xoay vòng sang API key tiếp theo khi API key hiện tại bị giới hạn. """ current_time = time.time() # Kiểm tra thời gian, nếu đã qua 60 giây kể từ lần sử dụng cuối, đặt lại chỉ số key if current_time - self.last_used_time >= 60: self.current_key_index = 0 else: # Chuyển sang API key tiếp theo self.current_key_index = (self.current_key_index + 1) % len(self.api_keys) self.update_api_key() self.last_used_time = current_time def execute(self, model_input, user_input, model_name="", temperature=0, prefix=None, infix=None, suffix=None, json_output=False): """ Thực thi yêu cầu với kiểm tra và thay đổi API key nếu cần thiết. :param model_input: Đầu vào cho mô hình. :param user_input: Đầu vào từ người dùng. :param model_name: Tên mô hình (tuỳ chọn). :param temperature: Nhiệt độ điều chỉnh tính ngẫu nhiên của mô hình. :param prefix: Chuỗi tiền tố tuỳ chọn. :param infix: Chuỗi xen giữa tuỳ chọn. :param suffix: Chuỗi hậu tố tuỳ chọn. :param json_output: Cờ để xác định có trả về kết quả JSON hay không. :return: Kết quả từ mô hình. """ try: return super().execute(model_input, user_input, model_name, temperature, prefix, infix, suffix, json_output) except Exception as e: if "Resource has been exhausted" in str(e): print("Lỗi:", e) print(f"API key bị giới hạn: {self.api_key}. Đang chuyển sang API key khác...") self.rotate_api_key() return super().execute(model_input, user_input, model_name, temperature, prefix, infix, suffix, json_output) else: print("Lỗi:", e) raise e def execute_with_image(self, model_input, user_input, base64_image, model_name="", temperature=0, prefix=None, infix=None, suffix=None, json_output=False): """ Thực thi yêu cầu với ảnh, kiểm tra và thay đổi API key nếu cần thiết. :param model_input: Đầu vào cho mô hình. :param user_input: Đầu vào từ người dùng. :param base64_image: Ảnh được mã hóa base64. :param model_name: Tên mô hình (tuỳ chọn). :param temperature: Nhiệt độ điều chỉnh tính ngẫu nhiên của mô hình. :param prefix: Chuỗi tiền tố tuỳ chọn. :param infix: Chuỗi xen giữa tuỳ chọn. :param suffix: Chuỗi hậu tố tuỳ chọn. :param json_output: Cờ để xác định có trả về kết quả JSON hay không. :return: Kết quả từ mô hình. """ try: return super().execute_with_image(model_input, user_input, base64_image, model_name, temperature, prefix, infix, suffix, json_output) except Exception as e: if "Resource has been exhausted" in str(e): print(f"API key bị giới hạn: {self.api_key}. Đang chuyển sang API key khác...") self.rotate_api_key() return super().execute_with_image(model_input, user_input, base64_image, model_name, temperature, prefix, infix, suffix, json_output) else: raise e def batch_execute(self, requests): """ Thực thi nhiều yêu cầu song song với việc kiểm tra và thay đổi API key nếu cần thiết. :param requests: Danh sách các yêu cầu. :return: Danh sách các phản hồi, tương ứng với từng yêu cầu. """ responses = [None] * len(requests) def process_request(index, request): model_input = request.get("model_input", "") user_input = request.get("user_input", "") prefix = request.get("prefix", None) infix = request.get("infix", None) suffix = request.get("suffix", None) model_name = request.get("model_name", self.model_name) temperature = request.get("temperature", 0) base64_image = request.get("base64_image", None) if base64_image: result = self.execute_with_image(model_input, user_input, base64_image, model_name, temperature, prefix, infix, suffix) else: result = self.execute(model_input, user_input, model_name, temperature, prefix, infix, suffix) responses[index] = result with concurrent.futures.ThreadPoolExecutor() as executor: futures = {executor.submit(process_request, i, request): i for i, request in enumerate(requests)} for future in concurrent.futures.as_completed(futures): index = futures[future] try: future.result() except Exception as exc: responses[index] = f"Exception occurred: {exc}" return responses