Spaces:
Running
Running
File size: 6,306 Bytes
ad17f03 aeb3778 ad17f03 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
import time
import os
import copy
import concurrent.futures
from pkgs import LangChainExecutor
class QuotaManager(LangChainExecutor):
def __init__(self, model_name, api_keys):
"""
Khởi tạo QuotaManager với danh sách các API key.
:param model_name: Tên của mô hình LLM. #gpt-... pr gemini-...
:param api_keys: Danh sách các API key.
"""
self.api_keys = api_keys
self.current_key_index = 0
self.last_used_time = time.time()
super().__init__(model_name)
self.update_api_key()
def update_api_key(self):
"""
Cập nhật API key hiện tại từ danh sách.
"""
self.api_key = self.api_keys[self.current_key_index]
os.environ["GEMINI_API_KEY"] = self.api_key
print(f"Đang sử dụng API key: {self.api_key}")
def rotate_api_key(self):
"""
Xoay vòng sang API key tiếp theo khi API key hiện tại bị giới hạn.
"""
current_time = time.time()
# Kiểm tra thời gian, nếu đã qua 60 giây kể từ lần sử dụng cuối, đặt lại chỉ số key
if current_time - self.last_used_time >= 60:
self.current_key_index = 0
else:
# Chuyển sang API key tiếp theo
self.current_key_index = (self.current_key_index + 1) % len(self.api_keys)
self.update_api_key()
self.last_used_time = current_time
def execute(self, model_input, user_input, model_name="", temperature=0, prefix=None, infix=None, suffix=None,
json_output=False):
"""
Thực thi yêu cầu với kiểm tra và thay đổi API key nếu cần thiết.
:param model_input: Đầu vào cho mô hình.
:param user_input: Đầu vào từ người dùng.
:param model_name: Tên mô hình (tuỳ chọn).
:param temperature: Nhiệt độ điều chỉnh tính ngẫu nhiên của mô hình.
:param prefix: Chuỗi tiền tố tuỳ chọn.
:param infix: Chuỗi xen giữa tuỳ chọn.
:param suffix: Chuỗi hậu tố tuỳ chọn.
:param json_output: Cờ để xác định có trả về kết quả JSON hay không.
:return: Kết quả từ mô hình.
"""
try:
return super().execute(model_input, user_input, model_name, temperature, prefix, infix, suffix, json_output)
except Exception as e:
if "Resource has been exhausted" in str(e):
print("Lỗi:", e)
print(f"API key bị giới hạn: {self.api_key}. Đang chuyển sang API key khác...")
self.rotate_api_key()
return super().execute(model_input, user_input, model_name, temperature, prefix, infix, suffix,
json_output)
else:
print("Lỗi:", e)
raise e
def execute_with_image(self, model_input, user_input, base64_image, model_name="", temperature=0, prefix=None,
infix=None, suffix=None, json_output=False):
"""
Thực thi yêu cầu với ảnh, kiểm tra và thay đổi API key nếu cần thiết.
:param model_input: Đầu vào cho mô hình.
:param user_input: Đầu vào từ người dùng.
:param base64_image: Ảnh được mã hóa base64.
:param model_name: Tên mô hình (tuỳ chọn).
:param temperature: Nhiệt độ điều chỉnh tính ngẫu nhiên của mô hình.
:param prefix: Chuỗi tiền tố tuỳ chọn.
:param infix: Chuỗi xen giữa tuỳ chọn.
:param suffix: Chuỗi hậu tố tuỳ chọn.
:param json_output: Cờ để xác định có trả về kết quả JSON hay không.
:return: Kết quả từ mô hình.
"""
try:
return super().execute_with_image(model_input, user_input, base64_image, model_name, temperature, prefix,
infix, suffix, json_output)
except Exception as e:
if "Resource has been exhausted" in str(e):
print(f"API key bị giới hạn: {self.api_key}. Đang chuyển sang API key khác...")
self.rotate_api_key()
return self().execute_with_image(model_input, user_input, base64_image, model_name, temperature,
prefix, infix, suffix, json_output)
else:
raise e
def batch_execute(self, requests):
"""
Thực thi nhiều yêu cầu song song với việc kiểm tra và thay đổi API key nếu cần thiết.
:param requests: Danh sách các yêu cầu.
:return: Danh sách các phản hồi, tương ứng với từng yêu cầu.
"""
responses = [None] * len(requests)
def process_request(index, request):
model_input = request.get("model_input", "")
user_input = request.get("user_input", "")
prefix = request.get("prefix", None)
infix = request.get("infix", None)
suffix = request.get("suffix", None)
model_name = request.get("model_name", self.model_name)
temperature = request.get("temperature", 0)
base64_image = request.get("base64_image", None)
if base64_image:
result = self.execute_with_image(model_input, user_input, base64_image, model_name, temperature, prefix,
infix, suffix)
else:
result = self.execute(model_input, user_input, model_name, temperature, prefix, infix, suffix)
responses[index] = result
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(process_request, i, request): i for i, request in enumerate(requests)}
for future in concurrent.futures.as_completed(futures):
index = futures[future]
try:
future.result()
except Exception as exc:
responses[index] = f"Exception occurred: {exc}"
return responses
|