Spaces:
Sleeping
Sleeping
import os | |
from dotenv import load_dotenv | |
import time | |
from datetime import datetime | |
from skills.skill_registry import SkillRegistry | |
from tasks.task_registry import TaskRegistry | |
from ongoing_tasks import ongoing_tasks | |
load_dotenv() # Load environment variables from .env file | |
api_keys = { | |
'openai': os.getenv('OPENAI_API_KEY'), | |
'serpapi': os.getenv('SERPAPI_API_KEY') | |
#'airtable': os.getenv('AIRTABLE_API_KEY') | |
} | |
OBJECTIVE = "Research Yohei Nakajima and write a poem about him." | |
LOAD_SKILLS = ['web_search', 'text_completion', 'code_reader','google_jobs_api_search','image_generation','startup_analysis','play_music','game_generation'] | |
#add web_search and documentation_search after you add SERPAPI_API_KEY in your secrets. airtable_search once you've added your AIRTABLE_API_KEY, and add base/table/column data to airtable_search.py, etc... | |
REFLECTION = False #Experimental reflection step between each task run (when running tasklist) | |
def run_single_task(task_id, task, skill_registry, task_outputs, OBJECTIVE, task_registry): | |
"""Execute a single task and update its status""" | |
task_output = task_registry.execute_task(task_id, task, skill_registry, task_outputs, OBJECTIVE) | |
task_outputs[task_id]["output"] = task_output | |
task_outputs[task_id]["completed"] = True | |
task_outputs[task_id]["description"] = task.get('description', 'No description available') | |
task_outputs[task_id]["skill"] = task.get('skill', 'No skill information available') | |
if task_output: | |
task_registry.update_tasks({"id": task_id, "status": "completed", "result": task_output}) | |
completed_task = task_registry.get_task(task_id) | |
print(f"Task #{task_id}: {completed_task.get('task')} [COMPLETED][{completed_task.get('skill')}]") | |
if REFLECTION: | |
new_tasks, insert_after_ids, tasks_to_update = task_registry.reflect_on_output(task_output, skill_descriptions) | |
for new_task, after_id in zip(new_tasks, insert_after_ids): | |
task_registry.add_task(new_task, after_id) | |
if isinstance(tasks_to_update, dict) and tasks_to_update: | |
tasks_to_update = [tasks_to_update] | |
for task_to_update in tasks_to_update: | |
task_registry.update_tasks(task_to_update) | |
def run_main_loop(OBJECTIVE, LOAD_SKILLS, api_keys, REFLECTION=False): | |
"""Main execution loop""" | |
try: | |
skill_descriptions = ",".join(f"[{skill.name}: {skill.description}]" for skill in global_skill_registry.skills.values()) | |
task_registry = TaskRegistry() | |
task_registry.create_tasklist(OBJECTIVE, skill_descriptions) | |
skill_names = [skill.name for skill in global_skill_registry.skills.values()] | |
session_summary = f"OBJECTIVE:{OBJECTIVE}.#SKILLS:{','.join(skill_names)}.#" | |
task_outputs = {task["id"]: {"completed": False, "output": None} for task in task_registry.get_tasks()} | |
task_output = None # Initialize task_output to None | |
while not all(task["completed"] for task in task_outputs.values()): | |
tasks = task_registry.get_tasks() | |
task_registry.print_tasklist(tasks) | |
for task in tasks: | |
if task["id"] not in task_outputs: | |
task_outputs[task["id"]] = {"completed": False, "output": None} | |
ready_tasks = [(task["id"], task) for task in tasks if all((dep in task_outputs and task_outputs[dep]["completed"]) for dep in task.get('dependent_task_ids', [])) and not task_outputs[task["id"]]["completed"]] | |
for task_id, task in ready_tasks: | |
run_single_task(task_id, task, global_skill_registry, task_outputs, OBJECTIVE, task_registry) | |
time.sleep(0.1) | |
# Assuming the last task in tasks has the latest output. Adjust if your use case is different. | |
last_task_id = tasks[-1]["id"] if tasks else None | |
task_output = task_outputs[last_task_id]["output"] if last_task_id else None | |
task_registry.reflect_on_final(OBJECTIVE, task_registry.get_tasks(), task_output, skill_descriptions) | |
global_skill_registry.reflect_skills(OBJECTIVE, task_registry.get_tasks(), task_output, skill_descriptions) | |
with open(f'output/output_{datetime.now().strftime("%d_%m_%Y_%H_%M_%S")}.txt', 'w') as file: | |
file.write(session_summary) | |
print("...file saved.") | |
print("END") | |
return task_output # Return the last task output | |
except Exception as e: | |
return f"An error occurred: {e}" | |
# Removed repeated logic for initiating skill registry | |
global_skill_registry = SkillRegistry(api_keys=api_keys, main_loop_function=run_main_loop, skill_names=LOAD_SKILLS) | |
def execute_skill(skill_name, objective, task_id): | |
"""Execute a single skill""" | |
skill = global_skill_registry.get_skill(skill_name) | |
if skill: | |
try: | |
result = skill.execute(objective, "", objective) | |
ongoing_tasks[task_id].update({"status": "completed", "output": result}) | |
except Exception as e: | |
ongoing_tasks[task_id].update({"status": "error", "error": str(e)}) | |
return task_id | |
return "Skill not found :(" | |
def execute_task_list(objective, api_keys, task_id): | |
"""Execute a list of tasks""" | |
try: | |
task_registry = TaskRegistry() | |
result = run_main_loop(objective, get_skills(), api_keys) | |
ongoing_tasks[task_id].update({"status": "completed", "output": result}) | |
return task_registry.get_tasks(), task_id | |
except Exception as e: | |
ongoing_tasks[task_id].update({"status": "error", "error": str(e)}) | |
print(f"Error in execute_task_list: {e}") | |
return task_id | |
def get_skills(): | |
"""Return the global skill registry""" | |
# Removed repeated logic for initiating skill registry | |
global global_skill_registry | |
print("Returning GLOBAL SKILL REGISTRY") | |
return global_skill_registry | |
# Removed repeated logic for initiating skill registry | |
global_skill_registry = SkillRegistry(api_keys=api_keys, main_loop_function=run_main_loop, skill_names=LOAD_SKILLS) | |
if __name__ == "__main__": | |
run_main_loop(OBJECTIVE, LOAD_SKILLS, api_keys, REFLECTION) |