File size: 3,706 Bytes
7aa6874 0133f13 f87e8b4 7aa6874 fc82ea1 7aa6874 f87e8b4 dd9a7ec 7aa6874 0133f13 cf287d2 a2e28f9 c87a829 7aa6874 0133f13 7aa6874 752352d a52bed3 d8fdcda 7aa6874 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
import gradio as gr
import datetime, os, threading
import base64, json
from autogen import ConversableAgent, AssistantAgent
from autogen.coding import LocalCommandLineCodeExecutor
def read_file(file_path: str) -> str:
with open(file_path, "r", encoding="utf-8") as file:
return file.read()
def read_image_file(image_file_path: str) -> str:
with open(image_file_path, "rb") as image_file:
image_data = image_file.read()
return base64.b64encode(image_data).decode("utf-8")
def generate_markdown_image(image_data: str) -> str:
return f"![Image](data:image/png;base64,{image_data})"
def format_as_markdown(code: str) -> str:
markdown_code = '```\n'
markdown_code += code
markdown_code += '\n```'
return markdown_code
def get_latest_file(directory, file_extension):
latest_file = None
latest_date = datetime.datetime.min
for file in os.listdir(directory):
if file:
_, file_ext = os.path.splitext(file)
if file_ext == file_extension:
file_path = os.path.join(directory, file)
file_date = datetime.datetime.fromtimestamp(os.path.getmtime(file_path))
if file_date > latest_date:
latest_date = file_date
latest_file = file
return latest_file
def run_multi_agent(llm, task):
llm_config = {"model": llm}
executor = LocalCommandLineCodeExecutor(
timeout=60,
work_dir="coding",
)
code_executor_agent = ConversableAgent(
name="code_executor_agent",
llm_config=False,
code_execution_config={"executor": executor},
human_input_mode="NEVER",
default_auto_reply="TERMINATE",
)
code_writer_agent = AssistantAgent(
name="code_writer_agent",
llm_config=llm_config,
code_execution_config=False,
human_input_mode="NEVER",
)
chat_result = code_executor_agent.initiate_chat(
code_writer_agent,
message=task,
max_turns=10
)
chat = ""
first_message = True
for message in chat_result.chat_history:
if not first_message:
chat += f"**{message['role'].replace('assistant', 'Code Executor').replace('user', 'Code Writer')}**\n{message['content']}\n\n"
first_message = False
file_name_png = get_latest_file("coding", ".png")
image_data = read_image_file(f"/home/user/app/coding/{file_name_png}")
markdown_code_png = generate_markdown_image(image_data)
result = f"{markdown_code_png}\n\n{chat}"
print("===")
print(result)
print("===")
return result
lock = threading.Lock()
LLM = "gpt-4o-mini"
def invoke(openai_api_key, openai_api_base, task):
if not openai_api_key:
raise gr.Error("OpenAI API Key is required.")
if not task:
raise gr.Error("Task is required.")
# raise gr.Error("Please clone space due to local code execution.")
with lock:
os.environ["OPENAI_API_KEY"] = openai_api_key
result = run_multi_agent(LLM, task)
del os.environ["OPENAI_API_KEY"]
return result
gr.close_all()
demo = gr.Interface(fn = invoke,
inputs = [gr.Textbox(label = "OpenAI API Key", type = "password", lines = 1),
gr.Textbox(label = "Task", value = f"Today is {datetime.date.today()}. {os.environ['INPUT']}")],
outputs = [gr.Markdown(label = "Output", value = os.environ["OUTPUT"], line_breaks = True, sanitize_html = False)],
title = "Multi-Agent AI: Coding",
description = os.environ["DESCRIPTION"])
demo.launch() |