Spaces:
Sleeping
Sleeping
import json | |
import uuid | |
from typing import Any, Dict, List, Optional, Union | |
from pydantic import ( | |
UUID4, | |
BaseModel, | |
ConfigDict, | |
Field, | |
InstanceOf, | |
Json, | |
PrivateAttr, | |
field_validator, | |
model_validator, | |
) | |
from pydantic_core import PydanticCustomError | |
from crewai.agent import Agent | |
from crewai.agents.cache import CacheHandler | |
from crewai.process import Process | |
from crewai.task import Task | |
from crewai.telemtry import Telemetry | |
from crewai.tools.agent_tools import AgentTools | |
from crewai.utilities import I18N, Logger, RPMController | |
class Crew(BaseModel): | |
""" | |
Represents a group of agents, defining how they should collaborate and the tasks they should perform. | |
Attributes: | |
tasks: List of tasks assigned to the crew. | |
agents: List of agents part of this crew. | |
manager_llm: The language model that will run manager agent. | |
process: The process flow that the crew will follow (e.g., sequential). | |
verbose: Indicates the verbosity level for logging during execution. | |
config: Configuration settings for the crew. | |
_cache_handler: Handles caching for the crew's operations. | |
max_rpm: Maximum number of requests per minute for the crew execution to be respected. | |
id: A unique identifier for the crew instance. | |
""" | |
__hash__ = object.__hash__ # type: ignore | |
_rpm_controller: RPMController = PrivateAttr() | |
_logger: Logger = PrivateAttr() | |
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler()) | |
model_config = ConfigDict(arbitrary_types_allowed=True) | |
tasks: List[Task] = Field(default_factory=list) | |
agents: List[Agent] = Field(default_factory=list) | |
process: Process = Field(default=Process.sequential) | |
verbose: Union[int, bool] = Field(default=0) | |
manager_llm: Optional[Any] = Field( | |
description="Language model that will run the agent.", default=None | |
) | |
config: Optional[Union[Json, Dict[str, Any]]] = Field(default=None) | |
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True) | |
max_rpm: Optional[int] = Field( | |
default=None, | |
description="Maximum number of requests per minute for the crew execution to be respected.", | |
) | |
language: str = Field( | |
default="en", | |
description="Language used for the crew, defaults to English.", | |
) | |
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: | |
"""Prevent manual setting of the 'id' field by users.""" | |
if v: | |
raise PydanticCustomError( | |
"may_not_set_field", "The 'id' field cannot be set by the user.", {} | |
) | |
def check_config_type( | |
cls, v: Union[Json, Dict[str, Any]] | |
) -> Union[Json, Dict[str, Any]]: | |
"""Validates that the config is a valid type. | |
Args: | |
v: The config to be validated. | |
Returns: | |
The config if it is valid. | |
""" | |
# TODO: Improve typing | |
return json.loads(v) if isinstance(v, Json) else v # type: ignore | |
def set_private_attrs(self) -> "Crew": | |
"""Set private attributes.""" | |
self._cache_handler = CacheHandler() | |
self._logger = Logger(self.verbose) | |
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger) | |
self._telemetry = Telemetry() | |
self._telemetry.crew_creation(self) | |
return self | |
def check_manager_llm(self): | |
"""Validates that the language model is set when using hierarchical process.""" | |
if self.process == Process.hierarchical and not self.manager_llm: | |
raise PydanticCustomError( | |
"missing_manager_llm", | |
"Attribute `manager_llm` is required when using hierarchical process.", | |
{}, | |
) | |
return self | |
def check_config(self): | |
"""Validates that the crew is properly configured with agents and tasks.""" | |
if not self.config and not self.tasks and not self.agents: | |
raise PydanticCustomError( | |
"missing_keys", | |
"Either 'agents' and 'tasks' need to be set or 'config'.", | |
{}, | |
) | |
if self.config: | |
self._setup_from_config() | |
if self.agents: | |
for agent in self.agents: | |
agent.set_cache_handler(self._cache_handler) | |
if self.max_rpm: | |
agent.set_rpm_controller(self._rpm_controller) | |
return self | |
def _setup_from_config(self): | |
assert self.config is not None, "Config should not be None." | |
"""Initializes agents and tasks from the provided config.""" | |
if not self.config.get("agents") or not self.config.get("tasks"): | |
raise PydanticCustomError( | |
"missing_keys_in_config", "Config should have 'agents' and 'tasks'.", {} | |
) | |
self.agents = [Agent(**agent) for agent in self.config["agents"]] | |
self.tasks = [self._create_task(task) for task in self.config["tasks"]] | |
def _create_task(self, task_config: Dict[str, Any]) -> Task: | |
"""Creates a task instance from its configuration. | |
Args: | |
task_config: The configuration of the task. | |
Returns: | |
A task instance. | |
""" | |
task_agent = next( | |
agt for agt in self.agents if agt.role == task_config["agent"] | |
) | |
del task_config["agent"] | |
return Task(**task_config, agent=task_agent) | |
def kickoff(self) -> str: | |
"""Starts the crew to work on its assigned tasks.""" | |
for agent in self.agents: | |
agent.i18n = I18N(language=self.language) | |
if self.process == Process.sequential: | |
return self._run_sequential_process() | |
if self.process == Process.hierarchical: | |
return self._run_hierarchical_process() | |
raise NotImplementedError( | |
f"The process '{self.process}' is not implemented yet." | |
) | |
def _run_sequential_process(self) -> str: | |
"""Executes tasks sequentially and returns the final output.""" | |
task_output = "" | |
for task in self.tasks: | |
if task.agent is not None and task.agent.allow_delegation: | |
agents_for_delegation = [ | |
agent for agent in self.agents if agent != task.agent | |
] | |
task.tools += AgentTools(agents=agents_for_delegation).tools() | |
role = task.agent.role if task.agent is not None else "None" | |
self._logger.log("debug", f"Working Agent: {role}") | |
self._logger.log("info", f"Starting Task: {task.description}") | |
output = task.execute(context=task_output) | |
if not task.async_execution: | |
task_output = output | |
role = task.agent.role if task.agent is not None else "None" | |
self._logger.log("debug", f"[{role}] Task output: {task_output}\n\n") | |
if self.max_rpm: | |
self._rpm_controller.stop_rpm_counter() | |
return task_output | |
def _run_hierarchical_process(self) -> str: | |
"""Creates and assigns a manager agent to make sure the crew completes the tasks.""" | |
i18n = I18N(language=self.language) | |
manager = Agent( | |
role=i18n.retrieve("hierarchical_manager_agent", "role"), | |
goal=i18n.retrieve("hierarchical_manager_agent", "goal"), | |
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"), | |
tools=AgentTools(agents=self.agents).tools(), | |
llm=self.manager_llm, | |
verbose=True, | |
) | |
task_output = "" | |
for task in self.tasks: | |
self._logger.log("debug", f"Working Agent: {manager.role}") | |
self._logger.log("info", f"Starting Task: {task.description}") | |
task_output = task.execute( | |
agent=manager, context=task_output, tools=manager.tools | |
) | |
self._logger.log( | |
"debug", f"[{manager.role}] Task output: {task_output}\n\n" | |
) | |
if self.max_rpm: | |
self._rpm_controller.stop_rpm_counter() | |
return task_output | |