kenken999's picture
updatest
14dc68f
raw
history blame
1.99 kB
import sys
import logging
import ray
from collections import deque
from typing import Dict, List
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent.parent))
from extensions.ray_objectives import CooperativeObjectivesListStorage
try:
ray.init(address="auto", namespace="babyagi", logging_level=logging.FATAL, ignore_reinit_error=True)
except:
ray.init(namespace="babyagi", logging_level=logging.FATAL, ignore_reinit_error=True)
@ray.remote
class CooperativeTaskListStorageActor:
def __init__(self):
self.tasks = deque([])
self.task_id_counter = 0
def append(self, task: Dict):
self.tasks.append(task)
def replace(self, tasks: List[Dict]):
self.tasks = deque(tasks)
def popleft(self):
return self.tasks.popleft()
def is_empty(self):
return False if self.tasks else True
def next_task_id(self):
self.task_id_counter += 1
return self.task_id_counter
def get_task_names(self):
return [t["task_name"] for t in self.tasks]
class CooperativeTaskListStorage:
def __init__(self, name: str):
self.name = name
try:
self.actor = ray.get_actor(name=self.name, namespace="babyagi")
except ValueError:
self.actor = CooperativeTaskListStorageActor.options(name=self.name, namespace="babyagi", lifetime="detached").remote()
objectives = CooperativeObjectivesListStorage()
objectives.append(self.name)
def append(self, task: Dict):
self.actor.append.remote(task)
def replace(self, tasks: List[Dict]):
self.actor.replace.remote(tasks)
def popleft(self):
return ray.get(self.actor.popleft.remote())
def is_empty(self):
return ray.get(self.actor.is_empty.remote())
def next_task_id(self):
return ray.get(self.actor.next_task_id.remote())
def get_task_names(self):
return ray.get(self.actor.get_task_names.remote())