Spaces:
Running
Running
import time | |
import os | |
import copy | |
import concurrent.futures | |
from pkgs import LangChainExecutor | |
class QuotaManager(LangChainExecutor): | |
def __init__(self, model_name, api_keys): | |
""" | |
Khởi tạo QuotaManager với danh sách các API key. | |
:param model_name: Tên của mô hình LLM. #gpt-... pr gemini-... | |
:param api_keys: Danh sách các API key. | |
""" | |
self.api_keys = api_keys | |
self.current_key_index = 0 | |
self.last_used_time = time.time() | |
super().__init__(model_name) | |
self.update_api_key() | |
def update_api_key(self): | |
""" | |
Cập nhật API key hiện tại từ danh sách. | |
""" | |
self.api_key = self.api_keys[self.current_key_index] | |
os.environ["GEMINI_API_KEY"] = self.api_key | |
print(f"Đang sử dụng API key: {self.api_key}") | |
def rotate_api_key(self): | |
""" | |
Xoay vòng sang API key tiếp theo khi API key hiện tại bị giới hạn. | |
""" | |
current_time = time.time() | |
# Kiểm tra thời gian, nếu đã qua 60 giây kể từ lần sử dụng cuối, đặt lại chỉ số key | |
if current_time - self.last_used_time >= 60: | |
self.current_key_index = 0 | |
else: | |
# Chuyển sang API key tiếp theo | |
self.current_key_index = (self.current_key_index + 1) % len(self.api_keys) | |
self.update_api_key() | |
self.last_used_time = current_time | |
def execute(self, model_input, user_input, model_name="", temperature=0, prefix=None, infix=None, suffix=None, | |
json_output=False): | |
""" | |
Thực thi yêu cầu với kiểm tra và thay đổi API key nếu cần thiết. | |
:param model_input: Đầu vào cho mô hình. | |
:param user_input: Đầu vào từ người dùng. | |
:param model_name: Tên mô hình (tuỳ chọn). | |
:param temperature: Nhiệt độ điều chỉnh tính ngẫu nhiên của mô hình. | |
:param prefix: Chuỗi tiền tố tuỳ chọn. | |
:param infix: Chuỗi xen giữa tuỳ chọn. | |
:param suffix: Chuỗi hậu tố tuỳ chọn. | |
:param json_output: Cờ để xác định có trả về kết quả JSON hay không. | |
:return: Kết quả từ mô hình. | |
""" | |
try: | |
return super().execute(model_input, user_input, model_name, temperature, prefix, infix, suffix, json_output) | |
except Exception as e: | |
if "Resource has been exhausted" in str(e): | |
print("Lỗi:", e) | |
print(f"API key bị giới hạn: {self.api_key}. Đang chuyển sang API key khác...") | |
self.rotate_api_key() | |
return super().execute(model_input, user_input, model_name, temperature, prefix, infix, suffix, | |
json_output) | |
else: | |
print("Lỗi:", e) | |
raise e | |
def execute_with_image(self, model_input, user_input, base64_image, model_name="", temperature=0, prefix=None, | |
infix=None, suffix=None, json_output=False): | |
""" | |
Thực thi yêu cầu với ảnh, kiểm tra và thay đổi API key nếu cần thiết. | |
:param model_input: Đầu vào cho mô hình. | |
:param user_input: Đầu vào từ người dùng. | |
:param base64_image: Ảnh được mã hóa base64. | |
:param model_name: Tên mô hình (tuỳ chọn). | |
:param temperature: Nhiệt độ điều chỉnh tính ngẫu nhiên của mô hình. | |
:param prefix: Chuỗi tiền tố tuỳ chọn. | |
:param infix: Chuỗi xen giữa tuỳ chọn. | |
:param suffix: Chuỗi hậu tố tuỳ chọn. | |
:param json_output: Cờ để xác định có trả về kết quả JSON hay không. | |
:return: Kết quả từ mô hình. | |
""" | |
try: | |
return super().execute_with_image(model_input, user_input, base64_image, model_name, temperature, prefix, | |
infix, suffix, json_output) | |
except Exception as e: | |
if "Resource has been exhausted" in str(e): | |
print(f"API key bị giới hạn: {self.api_key}. Đang chuyển sang API key khác...") | |
self.rotate_api_key() | |
return self().execute_with_image(model_input, user_input, base64_image, model_name, temperature, | |
prefix, infix, suffix, json_output) | |
else: | |
raise e | |
def batch_execute(self, requests): | |
""" | |
Thực thi nhiều yêu cầu song song với việc kiểm tra và thay đổi API key nếu cần thiết. | |
:param requests: Danh sách các yêu cầu. | |
:return: Danh sách các phản hồi, tương ứng với từng yêu cầu. | |
""" | |
responses = [None] * len(requests) | |
def process_request(index, request): | |
model_input = request.get("model_input", "") | |
user_input = request.get("user_input", "") | |
prefix = request.get("prefix", None) | |
infix = request.get("infix", None) | |
suffix = request.get("suffix", None) | |
model_name = request.get("model_name", self.model_name) | |
temperature = request.get("temperature", 0) | |
base64_image = request.get("base64_image", None) | |
if base64_image: | |
result = self.execute_with_image(model_input, user_input, base64_image, model_name, temperature, prefix, | |
infix, suffix) | |
else: | |
result = self.execute(model_input, user_input, model_name, temperature, prefix, infix, suffix) | |
responses[index] = result | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
futures = {executor.submit(process_request, i, request): i for i, request in enumerate(requests)} | |
for future in concurrent.futures.as_completed(futures): | |
index = futures[future] | |
try: | |
future.result() | |
except Exception as exc: | |
responses[index] = f"Exception occurred: {exc}" | |
return responses | |