Describe_things / pkgs /quota_manager.py
ShynBui's picture
Update pkgs/quota_manager.py
aeb3778 verified
import time
import os
import copy
import concurrent.futures
from pkgs import LangChainExecutor
class QuotaManager(LangChainExecutor):
def __init__(self, model_name, api_keys):
"""
Khởi tạo QuotaManager với danh sách các API key.
:param model_name: Tên của mô hình LLM. #gpt-... pr gemini-...
:param api_keys: Danh sách các API key.
"""
self.api_keys = api_keys
self.current_key_index = 0
self.last_used_time = time.time()
super().__init__(model_name)
self.update_api_key()
def update_api_key(self):
"""
Cập nhật API key hiện tại từ danh sách.
"""
self.api_key = self.api_keys[self.current_key_index]
os.environ["GEMINI_API_KEY"] = self.api_key
print(f"Đang sử dụng API key: {self.api_key}")
def rotate_api_key(self):
"""
Xoay vòng sang API key tiếp theo khi API key hiện tại bị giới hạn.
"""
current_time = time.time()
# Kiểm tra thời gian, nếu đã qua 60 giây kể từ lần sử dụng cuối, đặt lại chỉ số key
if current_time - self.last_used_time >= 60:
self.current_key_index = 0
else:
# Chuyển sang API key tiếp theo
self.current_key_index = (self.current_key_index + 1) % len(self.api_keys)
self.update_api_key()
self.last_used_time = current_time
def execute(self, model_input, user_input, model_name="", temperature=0, prefix=None, infix=None, suffix=None,
json_output=False):
"""
Thực thi yêu cầu với kiểm tra và thay đổi API key nếu cần thiết.
:param model_input: Đầu vào cho mô hình.
:param user_input: Đầu vào từ người dùng.
:param model_name: Tên mô hình (tuỳ chọn).
:param temperature: Nhiệt độ điều chỉnh tính ngẫu nhiên của mô hình.
:param prefix: Chuỗi tiền tố tuỳ chọn.
:param infix: Chuỗi xen giữa tuỳ chọn.
:param suffix: Chuỗi hậu tố tuỳ chọn.
:param json_output: Cờ để xác định có trả về kết quả JSON hay không.
:return: Kết quả từ mô hình.
"""
try:
return super().execute(model_input, user_input, model_name, temperature, prefix, infix, suffix, json_output)
except Exception as e:
if "Resource has been exhausted" in str(e):
print("Lỗi:", e)
print(f"API key bị giới hạn: {self.api_key}. Đang chuyển sang API key khác...")
self.rotate_api_key()
return super().execute(model_input, user_input, model_name, temperature, prefix, infix, suffix,
json_output)
else:
print("Lỗi:", e)
raise e
def execute_with_image(self, model_input, user_input, base64_image, model_name="", temperature=0, prefix=None,
infix=None, suffix=None, json_output=False):
"""
Thực thi yêu cầu với ảnh, kiểm tra và thay đổi API key nếu cần thiết.
:param model_input: Đầu vào cho mô hình.
:param user_input: Đầu vào từ người dùng.
:param base64_image: Ảnh được mã hóa base64.
:param model_name: Tên mô hình (tuỳ chọn).
:param temperature: Nhiệt độ điều chỉnh tính ngẫu nhiên của mô hình.
:param prefix: Chuỗi tiền tố tuỳ chọn.
:param infix: Chuỗi xen giữa tuỳ chọn.
:param suffix: Chuỗi hậu tố tuỳ chọn.
:param json_output: Cờ để xác định có trả về kết quả JSON hay không.
:return: Kết quả từ mô hình.
"""
try:
return super().execute_with_image(model_input, user_input, base64_image, model_name, temperature, prefix,
infix, suffix, json_output)
except Exception as e:
if "Resource has been exhausted" in str(e):
print(f"API key bị giới hạn: {self.api_key}. Đang chuyển sang API key khác...")
self.rotate_api_key()
return self().execute_with_image(model_input, user_input, base64_image, model_name, temperature,
prefix, infix, suffix, json_output)
else:
raise e
def batch_execute(self, requests):
"""
Thực thi nhiều yêu cầu song song với việc kiểm tra và thay đổi API key nếu cần thiết.
:param requests: Danh sách các yêu cầu.
:return: Danh sách các phản hồi, tương ứng với từng yêu cầu.
"""
responses = [None] * len(requests)
def process_request(index, request):
model_input = request.get("model_input", "")
user_input = request.get("user_input", "")
prefix = request.get("prefix", None)
infix = request.get("infix", None)
suffix = request.get("suffix", None)
model_name = request.get("model_name", self.model_name)
temperature = request.get("temperature", 0)
base64_image = request.get("base64_image", None)
if base64_image:
result = self.execute_with_image(model_input, user_input, base64_image, model_name, temperature, prefix,
infix, suffix)
else:
result = self.execute(model_input, user_input, model_name, temperature, prefix, infix, suffix)
responses[index] = result
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(process_request, i, request): i for i, request in enumerate(requests)}
for future in concurrent.futures.as_completed(futures):
index = futures[future]
try:
future.result()
except Exception as exc:
responses[index] = f"Exception occurred: {exc}"
return responses