File size: 7,956 Bytes
1a0cf07
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import datetime
import json
import logging
import os
import re
from abc import ABC, abstractmethod

import dirtyjson
import hjson
import numpy as np
import openai
from fuzzywuzzy import process
from sklearn.metrics.pairwise import cosine_similarity

api_configs = {
    "SambaNova": {
        "api_key": os.environ.get("SAMBANOVA_API_KEY"),
        "url_base": "https://api.sambanova.ai/v1"
    },
    "Together": {
        "api_key": os.environ.get("TOGETHER_API_KEY"),
        "url_base": "https://api.together.xyz/v1"
    }
    # You can add more API configurations here for other providers
}

class Agent(ABC):
    def __init__(self, prompt_template, llm_client):
        self.prompt_template = prompt_template
        self.llm_client = llm_client

    @abstractmethod
    def generate_response(self, prompt):
        pass


def setup_logging():
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    log_filename = f"logs/swiftsage_log_{timestamp}.txt"
    
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        filename=log_filename,
        filemode='w'
    )
    
    # Also print to console
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
    console.setFormatter(formatter)
    logging.getLogger('').addHandler(console)

    return logging.getLogger('SwiftSage')

 

def extract_and_parse_markup(text):
    keys = ["reasoning_steps", "final_answer", "feedback", "score", "critical_feedback", "revised_plan", "solved", "plan", "code"]
    result = {}
    if "<final_answer>" in text and "</final_answer>" not in text:
        text = text + "</final_answer>"

    for key in keys:
        # Create a pattern for each key
        pattern = f'<{key}>(.*?)</{key}>'
        
        # Search for the pattern in the text
        match = re.search(pattern, text, re.DOTALL)
        
        if match:
            # Extract the content, strip whitespace, and add to the result
            content = match.group(1).strip()
            result[key] = content

    if "code" in result.keys():
        result["code"] = result["code"].replace("```python", "").replace("```", "").strip()

    return result


class PromptTemplate:
    def __init__(self, template_dir):
        self.template_dir = template_dir
        self.templates = {}
        self.load_templates()

    def load_templates(self):
        for filename in ['swift_template.md', 'sage_template.md', 'reward_template.md']:
            with open(os.path.join(self.template_dir, filename), 'r') as f:
                key = filename.split('_')[0]
                self.templates[key] = f.read()

    def format(self, key, **kwargs):
        template = self.templates.get(key, "")
        for k, v in kwargs.items():
            template = template.replace("<" + k + ">", str(v)) 
        return template


class LLMClient:
    def __init__(self, model_id, api_config, temperature=0.3, top_p=1.0, max_tokens=3000, logger=None):
        self.client = openai.OpenAI(
            api_key=api_config['api_key'],
            base_url=api_config['url_base']
        )
        self.model_id = model_id
        self.temperature = temperature
        self.top_p = top_p
        self.max_tokens = max_tokens
        self.logger = logger

    def generate_response(self, messages):
        self.logger.info(f"Sending request to {self.model_id}")
        self.logger.info(f"Messages: {messages}")
        response = self.client.chat.completions.create(
            model=self.model_id,
            messages=messages,
            temperature=self.temperature,
            top_p=self.top_p,
            max_tokens=self.max_tokens
        )
        content = response.choices[0].message.content
        self.logger.info(f"Response from {self.model_id}:\n{content}")
        return content





if __name__ == "__main__":
    test_text = "test"
     
    print(extract_and_parse_markup(test_text))



"""

def extract_and_parse_json(text):

    keys_and_types = [
        ("reasoning_steps", list),
        ("final_answer", str),
        ("feedback", str),
        ("score", str),
        ("score", int),
        ("feedback", str),
        ("solved", str),
        ("critical_feedback", str),
        ("revised_plan", list),
    ]

    # Try to parse the JSON first
    try:
        # find the first and last curly braces and parse the json
        first_brace = text.find("{")
        last_brace = text.rfind("}")  
        if last_brace == -1:
            text = text + "\"}"
        if first_brace != -1 and last_brace != -1 and first_brace < last_brace:
            data = json.loads(text[first_brace:last_brace+1])
        return data
    except Exception as e:
        data = {}
        try: 
            data = dirtyjson.loads(text) 
        except Exception as e:
            pass
        # If JSON parsing fails, use regex to extract key-value pairs
        
        for key, _ in keys_and_types:
            # pattern = rf'"{key}"\s*:\s*([\[{{].*?[\]}}]|".*?")'
            pattern = rf'"{key}"\s*:\s*([\[{{].*?[\]}}]|".*?"|[-+]?\d+)'
            match = re.search(pattern, text, re.DOTALL)
            if match:
                try:
                    value = json.loads(match.group(1))
                except Exception as e:
                    value = match.group(1).strip('"')
                data[key] = value

    result = {}
    for key, expected_type in keys_and_types:
        if key in result.keys() and result[key] is not None:
            continue
        # Use fuzzy matching to find the closest key
        try:
            closest_key, score = process.extractOne(key, data.keys())
        except Exception as e:
            continue
        if score > 80:  # You can adjust this threshold
            value = data[closest_key]
            
            # Type checking and conversion
            if expected_type == list and isinstance(value, str):
                value = [item.strip() for item in value.strip('[]').split(',')]
            elif expected_type == str and isinstance(value, list):
                value = ', '.join(value)
            elif expected_type == int and value is not None:
                try:
                    value = int(value)
                except ValueError:
                    value = None
            
            result[key] = value
        else:
            result[key] = None 
    
    for key in list(result.keys()):
        if result[key] is None:
            del result[key]
    return result

def extract_and_parse_json_v1(text):
    def find_json_objects(s):
        # Find all substrings that look like JSON objects
        json_like_strs = re.findall(r'\{(?:[^{}]|\{[^{}]*\})*\}', s)
        return json_like_strs

    def try_parse_json(s):
        try:
            return json.loads(s)
        except json.JSONDecodeError:
            try:
                s = s.replace("\n", "")
                return hjson.loads(s)
            except json.JSONDecodeError:
                return None
            return None 

    # First, try to find JSON within code blocks
    code_block_pattern = r'```(?:json)?\s*([\s\S]*?)\s*```'
    code_blocks = re.findall(code_block_pattern, text, re.IGNORECASE)
    
    all_json_candidates = []
    
    # Add JSON candidates from code blocks
    for block in code_blocks:
        all_json_candidates.extend(find_json_objects(block))
    
    # Add JSON candidates from the entire text
    all_json_candidates.extend(find_json_objects(text))
    
    # Sort candidates by length, descending
    all_json_candidates.sort(key=len, reverse=True)
    
    # Try to parse each candidate
    for candidate in all_json_candidates:
        parsed_json = try_parse_json(candidate)
        if parsed_json is not None:
            return parsed_json
    
    raise ValueError("No valid JSON object found in the text")

 
 

"""