File size: 6,306 Bytes
ad17f03
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aeb3778
ad17f03
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import time
import os
import copy
import concurrent.futures
from pkgs import LangChainExecutor


class QuotaManager(LangChainExecutor):
    def __init__(self, model_name, api_keys):
        """
        Khởi tạo QuotaManager với danh sách các API key.

        :param model_name: Tên của mô hình LLM.  #gpt-... pr gemini-...
        :param api_keys: Danh sách các API key.
        """
        self.api_keys = api_keys
        self.current_key_index = 0
        self.last_used_time = time.time()
        super().__init__(model_name)
        self.update_api_key()

    def update_api_key(self):
        """
        Cập nhật API key hiện tại từ danh sách.
        """
        self.api_key = self.api_keys[self.current_key_index]
        os.environ["GEMINI_API_KEY"] = self.api_key
        print(f"Đang sử dụng API key: {self.api_key}")

    def rotate_api_key(self):
        """
        Xoay vòng sang API key tiếp theo khi API key hiện tại bị giới hạn.
        """
        current_time = time.time()

        # Kiểm tra thời gian, nếu đã qua 60 giây kể từ lần sử dụng cuối, đặt lại chỉ số key
        if current_time - self.last_used_time >= 60:
            self.current_key_index = 0
        else:
            # Chuyển sang API key tiếp theo
            self.current_key_index = (self.current_key_index + 1) % len(self.api_keys)

        self.update_api_key()
        self.last_used_time = current_time

    def execute(self, model_input, user_input, model_name="", temperature=0, prefix=None, infix=None, suffix=None,
                json_output=False):
        """
        Thực thi yêu cầu với kiểm tra và thay đổi API key nếu cần thiết.

        :param model_input: Đầu vào cho mô hình.
        :param user_input: Đầu vào từ người dùng.
        :param model_name: Tên mô hình (tuỳ chọn).
        :param temperature: Nhiệt độ điều chỉnh tính ngẫu nhiên của mô hình.
        :param prefix: Chuỗi tiền tố tuỳ chọn.
        :param infix: Chuỗi xen giữa tuỳ chọn.
        :param suffix: Chuỗi hậu tố tuỳ chọn.
        :param json_output: Cờ để xác định có trả về kết quả JSON hay không.
        :return: Kết quả từ mô hình.
        """
        try:
            return super().execute(model_input, user_input, model_name, temperature, prefix, infix, suffix, json_output)
        except Exception as e:
            if "Resource has been exhausted" in str(e):
                print("Lỗi:", e)
                print(f"API key bị giới hạn: {self.api_key}. Đang chuyển sang API key khác...")
                self.rotate_api_key()
                return super().execute(model_input, user_input, model_name, temperature, prefix, infix, suffix,
                                       json_output)
            else:
                print("Lỗi:", e)
                raise e

    def execute_with_image(self, model_input, user_input, base64_image, model_name="", temperature=0, prefix=None,
                           infix=None, suffix=None, json_output=False):
        """
        Thực thi yêu cầu với ảnh, kiểm tra và thay đổi API key nếu cần thiết.

        :param model_input: Đầu vào cho mô hình.
        :param user_input: Đầu vào từ người dùng.
        :param base64_image: Ảnh được mã hóa base64.
        :param model_name: Tên mô hình (tuỳ chọn).
        :param temperature: Nhiệt độ điều chỉnh tính ngẫu nhiên của mô hình.
        :param prefix: Chuỗi tiền tố tuỳ chọn.
        :param infix: Chuỗi xen giữa tuỳ chọn.
        :param suffix: Chuỗi hậu tố tuỳ chọn.
        :param json_output: Cờ để xác định có trả về kết quả JSON hay không.
        :return: Kết quả từ mô hình.
        """
        try:
            return super().execute_with_image(model_input, user_input, base64_image, model_name, temperature, prefix,
                                              infix, suffix, json_output)
        except Exception as e:
            if "Resource has been exhausted" in str(e):
                print(f"API key bị giới hạn: {self.api_key}. Đang chuyển sang API key khác...")
                self.rotate_api_key()
                return self().execute_with_image(model_input, user_input, base64_image, model_name, temperature,
                                                  prefix, infix, suffix, json_output)
            else:
                raise e

    def batch_execute(self, requests):
        """
        Thực thi nhiều yêu cầu song song với việc kiểm tra và thay đổi API key nếu cần thiết.

        :param requests: Danh sách các yêu cầu.
        :return: Danh sách các phản hồi, tương ứng với từng yêu cầu.
        """
        responses = [None] * len(requests)

        def process_request(index, request):
            model_input = request.get("model_input", "")
            user_input = request.get("user_input", "")
            prefix = request.get("prefix", None)
            infix = request.get("infix", None)
            suffix = request.get("suffix", None)
            model_name = request.get("model_name", self.model_name)
            temperature = request.get("temperature", 0)
            base64_image = request.get("base64_image", None)

            if base64_image:
                result = self.execute_with_image(model_input, user_input, base64_image, model_name, temperature, prefix,
                                                 infix, suffix)
            else:
                result = self.execute(model_input, user_input, model_name, temperature, prefix, infix, suffix)

            responses[index] = result

        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = {executor.submit(process_request, i, request): i for i, request in enumerate(requests)}

            for future in concurrent.futures.as_completed(futures):
                index = futures[future]
                try:
                    future.result()
                except Exception as exc:
                    responses[index] = f"Exception occurred: {exc}"

        return responses