import os import subprocess import random import time from typing import Dict, List, Tuple from datetime import datetime import logging import gradio as gr from huggingface_hub import InferenceClient from safe_search import safe_search from i_search import google, i_search as i_s from transformers import AutoModelForCausalLM, AutoTokenizer import random import prompts client = InferenceClient( "mistralai/Mixtral-8x7B-Instruct-v0.1" ) # --- Configuration --- VERBOSE = True MAX_HISTORY = 5 MAX_TOKENS = 2048 TEMPERATURE = 0.7 TOP_P = 0.8 REPETITION_PENALTY = 1.5 MODEL_NAME = "mistralai/Mixtral-8x7B-Instruct-v0.1" API_KEY = os.getenv("HUGGINGFACE_API_KEY") # --- Logging Setup --- logging.basicConfig( filename="app.log", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) # --- Agents --- agents = [ "WEB_DEV", "AI_SYSTEM_PROMPT", "PYTHON_CODE_DEV", "DATA_SCIENCE", "UI_UX_DESIGN", ] # --- Prompts --- PREFIX = """ {date_time_str} Purpose: {purpose} Safe Search: {safe_search} """ LOG_PROMPT = """ PROMPT: {content} """ LOG_RESPONSE = """ RESPONSE: {resp} """ COMPRESS_HISTORY_PROMPT = """ You are a helpful AI assistant. Your task is to compress the following history into a summary that is no longer than 512 tokens. History: {history} """ ACTION_PROMPT = """ You are a helpful AI assistant. You are working on the task: {task} Your current history is: {history} What is your next thought? thought: What is your next action? action: """ TASK_PROMPT = """ You are a helpful AI assistant. Your current history is: {history} What is the next task? task: """ UNDERSTAND_TEST_RESULTS_PROMPT = """ You are a helpful AI assistant. The test results are: {test_results} What do you want to know about the test results? thought: """ # --- Functions --- def format_prompt(message: str, history: List[Tuple[str, str]], max_history_turns: int = 2) -> str: """Formats the prompt for the LLM, including the message and recent history.""" prompt = " " for user_prompt, bot_response in history[-max_history_turns:]: prompt += f"[INST] {user_prompt} [/INST] {bot_response} " prompt += f"[INST] {message} [/INST]" return prompt def run_llm( prompt_template: str, stop_tokens: List[str], purpose: str, **prompt_kwargs: Dict, ) -> str: """Runs the LLM with the given prompt template, stop tokens, and purpose.""" seed = random.randint(1, 1111111111111111) logging.info(f"Seed: {seed}") content = PREFIX.format( date_time_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), purpose=purpose, safe_search=safe_search, ) + prompt_template.format(**prompt_kwargs) if VERBOSE: logging.info(LOG_PROMPT.format(content=content)) client = InferenceClient(model=MODEL_NAME, token=API_KEY) resp = client.text_generation( content, max_new_tokens=MAX_TOKENS, stop_sequences=stop_tokens, temperature=TEMPERATURE, top_p=TOP_P, repetition_penalty=REPETITION_PENALTY, ) if VERBOSE: logging.info(LOG_RESPONSE.format(resp=resp)) return resp.text # Access the text attribute of the response def generate( prompt: str, history: List[Tuple[str, str]], agent_name: str = agents[0], sys_prompt: str = "", temperature: float = TEMPERATURE, max_new_tokens: int = MAX_TOKENS, top_p: float = TOP_P, repetition_penalty: float = REPETITION_PENALTY, ) -> str: """Generates a response from the LLM based on the prompt, history, and other parameters.""" content = PREFIX.format( date_time_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), purpose=f"Generating response as {agent_name}", safe_search=safe_search, ) + sys_prompt + "\n" + prompt if VERBOSE: logging.info(LOG_PROMPT.format(content=content)) client = InferenceClient(model=MODEL_NAME, token=API_KEY) response = client.text_generation( content, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, repetition_penalty=repetition_penalty, ) if VERBOSE: logging.info(LOG_RESPONSE.format(resp=response)) return response.text # --- Mixtral Integration --- def mixtral_generate( prompt: str, history: List[Tuple[str, str]], agent_name: str = agents[0], sys_prompt: str = "", temperature: float = TEMPERATURE, max_new_tokens: int = MAX_TOKENS, top_p: float = TOP_P, repetition_penalty: float = REPETITION_PENALTY, ) -> str: """Generates a response using the Mixtral model.""" tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) model = AutoModelForCausalLM.from_pretrained(MODEL_NAME) content = PREFIX.format( date_time_str=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), purpose=f"Generating response as {agent_name}", safe_search=safe_search, ) + sys_prompt + "\n" + prompt inputs = tokenizer(content, return_tensors="pt") outputs = model.generate(**inputs, max_new_tokens=max_new_tokens, temperature=temperature, top_p=top_p, repetition_penalty=repetition_penalty) return tokenizer.decode(outputs[0], skip_special_tokens=True) def main(): """Main function to launch the Gradio interface.""" with gr.Blocks() as demo: gr.Markdown("## FragMixt: The No-Code Development Powerhouse") gr.Markdown("### Your AI-Powered Development Companion") with gr.Row(): with gr.Column(scale=3): chatbot = gr.Chatbot( show_label=False, show_share_button=False, show_copy_button=True, likeable=True, layout="panel", ) message = gr.Textbox( label="Enter your message", placeholder="Ask me anything!" ) submit_button = gr.Button(value="Send") with gr.Column(scale=1): purpose = gr.Textbox( label="Purpose", placeholder="What is the purpose of this interaction?" ) agent_name = gr.Dropdown( label="Agents", choices=[s for s in agents], value=agents[0], interactive=True, ) sys_prompt = gr.Textbox( label="System Prompt", max_lines=1, interactive=True ) temperature = gr.Slider( label="Temperature", value=TEMPERATURE, minimum=0.0, maximum=1.0, step=0.05, interactive=True, info="Higher values produce more diverse outputs", ) max_new_tokens = gr.Slider( label="Max new tokens", value=MAX_TOKENS, minimum=0, maximum=1048 * 10, step=64, interactive=True, info="The maximum numbers of new tokens", ) top_p = gr.Slider( label="Top-p (nucleus sampling)", value=TOP_P, minimum=0.0, maximum=1, step=0.05, interactive=True, info="Higher values sample more low-probability tokens", ) repetition_penalty = gr.Slider( label="Repetition penalty", value=REPETITION_PENALTY, minimum=1.0, maximum=2.0, step=0.05, interactive=True, info="Penalize repeated tokens", ) with gr.Tabs(): with gr.TabItem("Project Explorer"): project_path = gr.Textbox( label="Project Path", placeholder="/home/user/app/current_project" ) explore_button = gr.Button(value="Explore") project_output = gr.Textbox(label="File Tree", lines=20) with gr.TabItem("Code Editor"): code_editor = gr.Code(label="Code Editor", language="python") run_code_button = gr.Button(value="Run Code") code_output = gr.Textbox(label="Code Output", lines=10) with gr.TabItem("File Management"): file_list = gr.Dropdown( label="Select File", choices=[], interactive=True ) file_content = gr.Textbox(label="File Content", lines=20) save_file_button = gr.Button(value="Save File") create_file_button = gr.Button(value="Create New File") delete_file_button = gr.Button(value="Delete File") history = gr.State([]) def chat( purpose: str, message: str, agent_name: str, sys_prompt: str, temperature: float, max_new_tokens: int, top_p: float, repetition_penalty: float, history: List[Tuple[str, str]], ) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]: """Handles the chat interaction, generating responses and updating history.""" prompt = format_prompt(message, history) # Use Mixtral for generation response = mixtral_generate( prompt, history, agent_name, sys_prompt, temperature, max_new_tokens, top_p, repetition_penalty, ) history.append((message, response)) return history, history submit_button.click( chat, inputs=[ purpose, message, agent_name, sys_prompt, temperature, max_new_tokens, top_p, repetition_penalty, history, ], outputs=[chatbot, history], ) def explore_project(project_path: str) -> str: """Explores the project directory and displays the file tree.""" try: tree = subprocess.check_output(["tree", project_path]).decode("utf-8") return tree except Exception as e: return f"Error exploring project: {e}" explore_button.click( explore_project, inputs=[project_path], outputs=[project_output] ) def run_code(code: str) -> str: """Executes the Python code in the code editor and returns the output.""" try: exec_globals = {} exec(code, exec_globals) output = exec_globals.get("__builtins__", {}).get("print", print) return str(output) except Exception as e: return f"Error running code: {e}" run_code_button.click( run_code, inputs=[code_editor], outputs=[code_output] ) def load_file_list(project_path: str) -> List[str]: """Loads the list of files in the project directory.""" try: return [ f for f in os.listdir(project_path) if os.path.isfile(os.path.join(project_path, f)) ] except Exception as e: return [f"Error loading file list: {e}"] def load_file_content(project_path: str, file_name: str) -> str: """Loads the content of the selected file.""" try: with open(os.path.join(project_path, file_name), "r") as file: return file.read() except Exception as e: return f"Error loading file content: {e}" def save_file(project_path: str, file_name: str, content: str) -> str: """Saves the content to the selected file.""" try: with open(os.path.join(project_path, file_name), "w") as file: file.write(content) return f"File {file_name} saved successfully." except Exception as e: return f"Error saving file: {e}" def create_file(project_path: str, file_name: str) -> str: """Creates a new file in the project directory.""" try: open(os.path.join(project_path, file_name), "a").close() return f"File {file_name} created successfully." except Exception as e: return f"Error creating file: {e}" def delete_file(project_path: str, file_name: str) -> str: """Deletes the selected file from the project directory.""" try: os.remove(os.path.join(project_path, file_name)) return f"File {file_name} deleted successfully." except Exception as e: return f"Error deleting file: {e}" project_path.change( load_file_list, inputs=[project_path], outputs=[file_list] ) file_list.change( load_file_content, inputs=[project_path, file_list], outputs=[file_content] ) save_file_button.click( save_file, inputs=[project_path, file_list, file_content], outputs=[gr.Textbox()] ) create_file_button.click( create_file, inputs=[project_path, gr.Textbox(label="New File Name")], outputs=[gr.Textbox()], ) delete_file_button.click( delete_file, inputs=[project_path, file_list], outputs=[gr.Textbox()] ) demo.launch() if __name__ == "__main__": main()