File size: 6,256 Bytes
14dc68f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import os
from dotenv import load_dotenv
import time
from datetime import datetime
from skills.skill_registry import SkillRegistry
from tasks.task_registry import TaskRegistry
from ongoing_tasks import ongoing_tasks

load_dotenv()  # Load environment variables from .env file

api_keys = {
    'openai': os.getenv('OPENAI_API_KEY'),
    'serpapi': os.getenv('SERPAPI_API_KEY')
    #'airtable': os.getenv('AIRTABLE_API_KEY')
}

OBJECTIVE = "Research Yohei Nakajima and write a poem about him."
LOAD_SKILLS = ['web_search', 'text_completion', 'code_reader','google_jobs_api_search','image_generation','startup_analysis','play_music','game_generation']
#add web_search and documentation_search after you add SERPAPI_API_KEY in your secrets. airtable_search once you've added your AIRTABLE_API_KEY, and add base/table/column data to airtable_search.py, etc...
REFLECTION = False #Experimental reflection step between each task run (when running tasklist)

def run_single_task(task_id, task, skill_registry, task_outputs, OBJECTIVE, task_registry):
    """Execute a single task and update its status"""
    task_output = task_registry.execute_task(task_id, task, skill_registry, task_outputs, OBJECTIVE)
    
    task_outputs[task_id]["output"] = task_output
    task_outputs[task_id]["completed"] = True
    task_outputs[task_id]["description"] = task.get('description', 'No description available')
    task_outputs[task_id]["skill"] = task.get('skill', 'No skill information available')
    
    if task_output:
        task_registry.update_tasks({"id": task_id, "status": "completed", "result": task_output})

        completed_task = task_registry.get_task(task_id)
        print(f"Task #{task_id}: {completed_task.get('task')} [COMPLETED][{completed_task.get('skill')}]")
        
        if REFLECTION:
            new_tasks, insert_after_ids, tasks_to_update = task_registry.reflect_on_output(task_output, skill_descriptions)
            for new_task, after_id in zip(new_tasks, insert_after_ids):
                task_registry.add_task(new_task, after_id)

            if isinstance(tasks_to_update, dict) and tasks_to_update:
                tasks_to_update = [tasks_to_update]
            
            for task_to_update in tasks_to_update:
                task_registry.update_tasks(task_to_update)




def run_main_loop(OBJECTIVE, LOAD_SKILLS, api_keys, REFLECTION=False):
    """Main execution loop"""
    try:
        skill_descriptions = ",".join(f"[{skill.name}: {skill.description}]" for skill in global_skill_registry.skills.values())
        task_registry = TaskRegistry()
        task_registry.create_tasklist(OBJECTIVE, skill_descriptions)

        skill_names = [skill.name for skill in global_skill_registry.skills.values()]
        session_summary = f"OBJECTIVE:{OBJECTIVE}.#SKILLS:{','.join(skill_names)}.#"

        task_outputs = {task["id"]: {"completed": False, "output": None} for task in task_registry.get_tasks()}

        task_output = None  # Initialize task_output to None

        while not all(task["completed"] for task in task_outputs.values()):
            tasks = task_registry.get_tasks()
            task_registry.print_tasklist(tasks)

            for task in tasks:
                if task["id"] not in task_outputs:
                    task_outputs[task["id"]] = {"completed": False, "output": None}

                ready_tasks = [(task["id"], task) for task in tasks if all((dep in task_outputs and task_outputs[dep]["completed"]) for dep in task.get('dependent_task_ids', [])) and not task_outputs[task["id"]]["completed"]]

                for task_id, task in ready_tasks:
                    run_single_task(task_id, task, global_skill_registry, task_outputs, OBJECTIVE, task_registry)

                time.sleep(0.1)

            # Assuming the last task in tasks has the latest output. Adjust if your use case is different.
            last_task_id = tasks[-1]["id"] if tasks else None
            task_output = task_outputs[last_task_id]["output"] if last_task_id else None

            task_registry.reflect_on_final(OBJECTIVE, task_registry.get_tasks(), task_output, skill_descriptions)
            global_skill_registry.reflect_skills(OBJECTIVE, task_registry.get_tasks(), task_output, skill_descriptions)

            with open(f'output/output_{datetime.now().strftime("%d_%m_%Y_%H_%M_%S")}.txt', 'w') as file:
                file.write(session_summary)
            print("...file saved.")
            print("END")

        return task_output  # Return the last task output

    except Exception as e:
        return f"An error occurred: {e}"



# Removed repeated logic for initiating skill registry
global_skill_registry = SkillRegistry(api_keys=api_keys, main_loop_function=run_main_loop, skill_names=LOAD_SKILLS)


def execute_skill(skill_name, objective, task_id):
    """Execute a single skill"""
    skill = global_skill_registry.get_skill(skill_name)
    if skill:
        try:
            result = skill.execute(objective, "", objective)
            ongoing_tasks[task_id].update({"status": "completed", "output": result})
        except Exception as e:
            ongoing_tasks[task_id].update({"status": "error", "error": str(e)})
        return task_id
    return "Skill not found :("

def execute_task_list(objective, api_keys, task_id):
    """Execute a list of tasks"""
    try:
        task_registry = TaskRegistry()
        result = run_main_loop(objective, get_skills(), api_keys)
        ongoing_tasks[task_id].update({"status": "completed", "output": result})
        return task_registry.get_tasks(), task_id
    except Exception as e:
        ongoing_tasks[task_id].update({"status": "error", "error": str(e)})
        print(f"Error in execute_task_list: {e}")
    return task_id



def get_skills():
    """Return the global skill registry"""
    # Removed repeated logic for initiating skill registry
    global global_skill_registry
    print("Returning GLOBAL SKILL REGISTRY")
    return global_skill_registry

# Removed repeated logic for initiating skill registry
global_skill_registry = SkillRegistry(api_keys=api_keys, main_loop_function=run_main_loop, skill_names=LOAD_SKILLS)

if __name__ == "__main__":
    run_main_loop(OBJECTIVE, LOAD_SKILLS, api_keys, REFLECTION)