Spaces:
Sleeping
Sleeping
import threading | |
import uuid | |
from typing import Any, List, Optional | |
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator | |
from pydantic_core import PydanticCustomError | |
from crewai.agent import Agent | |
from crewai.tasks.task_output import TaskOutput | |
from crewai.utilities import I18N | |
class Task(BaseModel): | |
"""Class that represent a task to be executed.""" | |
class Config: | |
arbitrary_types_allowed = True | |
__hash__ = object.__hash__ # type: ignore | |
i18n: I18N = I18N() | |
thread: threading.Thread = None | |
description: str = Field(description="Description of the actual task.") | |
callback: Optional[Any] = Field( | |
description="Callback to be executed after the task is completed.", default=None | |
) | |
agent: Optional[Agent] = Field( | |
description="Agent responsible for execution the task.", default=None | |
) | |
expected_output: Optional[str] = Field( | |
description="Clear definition of expected output for the task.", | |
default=None, | |
) | |
context: Optional[List["Task"]] = Field( | |
description="Other tasks that will have their output used as context for this task.", | |
default=None, | |
) | |
async_execution: Optional[bool] = Field( | |
description="Whether the task should be executed asynchronously or not.", | |
default=False, | |
) | |
output: Optional[TaskOutput] = Field( | |
description="Task output, it's final result after being executed", default=None | |
) | |
tools: List[Any] = Field( | |
default_factory=list, | |
description="Tools the agent is limited to use for this task.", | |
) | |
id: UUID4 = Field( | |
default_factory=uuid.uuid4, | |
frozen=True, | |
description="Unique identifier for the object, not set by user.", | |
) | |
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: | |
if v: | |
raise PydanticCustomError( | |
"may_not_set_field", "This field is not to be set by the user.", {} | |
) | |
def check_tools(self): | |
"""Check if the tools are set.""" | |
if not self.tools and self.agent and self.agent.tools: | |
self.tools.extend(self.agent.tools) | |
return self | |
def execute( | |
self, | |
agent: Agent | None = None, | |
context: Optional[str] = None, | |
tools: Optional[List[Any]] = None, | |
) -> str: | |
"""Execute the task. | |
Returns: | |
Output of the task. | |
""" | |
agent = agent or self.agent | |
if not agent: | |
raise Exception( | |
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical." | |
) | |
if self.context: | |
context = [] | |
for task in self.context: | |
if task.async_execution: | |
task.thread.join() | |
context.append(task.output.result) | |
context = "\n".join(context) | |
tools = tools or self.tools | |
if self.async_execution: | |
self.thread = threading.Thread( | |
target=self._execute, args=(agent, self._prompt(), context, tools) | |
) | |
self.thread.start() | |
else: | |
result = self._execute( | |
agent=agent, | |
task_prompt=self._prompt(), | |
context=context, | |
tools=tools, | |
) | |
return result | |
def _execute(self, agent, task_prompt, context, tools): | |
result = agent.execute_task(task=task_prompt, context=context, tools=tools) | |
self.output = TaskOutput(description=self.description, result=result) | |
self.callback(self.output) if self.callback else None | |
return result | |
def _prompt(self) -> str: | |
"""Prompt the task. | |
Returns: | |
Prompt of the task. | |
""" | |
tasks_slices = [self.description] | |
if self.expected_output: | |
output = self.i18n.slice("expected_output").format( | |
expected_output=self.expected_output | |
) | |
tasks_slices = [self.description, output] | |
return "\n".join(tasks_slices) | |