aie4-final / helper_functions.py
angry-meow's picture
moving to a graph
0f09cc9
from typing import Dict, List
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain_community.document_loaders import PyMuPDFLoader, TextLoader, UnstructuredURLLoader, WebBaseLoader
from langchain_community.vectorstores import Qdrant
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.language_models import BaseLanguageModel
import os
import functools
import requests
import tempfile
from chainlit.types import AskFileResponse
def process_file(uploaded_file: AskFileResponse):
if uploaded_file.name.endswith(".pdf"):
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".pdf") as temp_file:
temp_file_path = temp_file.name
with open(temp_file_path, "wb") as f:
f.write(uploaded_file.content)
# Load PDF with PyMuPDFLoader
loader = PyMuPDFLoader(temp_file_path)
elif uploaded_file.name.endswith(".txt"):
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as temp_file:
temp_file_path = temp_file.name
with open(temp_file_path, "wb") as f:
f.write(uploaded_file.content)
# Load text file with TextLoader
loader = TextLoader(temp_file_path)
else:
raise ValueError("Unsupported file format. Only PDF and TXT are supported.")
return loader.load()
def load_documents_from_url(url):
try:
# Check if it's a PDF
if url.endswith(".pdf"):
try:
loader = PyMuPDFLoader(url)
return loader.load()
except Exception as e:
print(f"Error loading PDF from {url}: {e}")
return None
# Fetch the content and check for video pages
try:
response = requests.head(url, timeout=10) # Timeout for fetching headers
content_type = response.headers.get('Content-Type', '')
except Exception as e:
print(f"Error fetching headers from {url}: {e}")
return None
# Ignore video content (flagged for now)
if 'video' in content_type:
return None
if 'youtube' in url:
return None
# Otherwise, treat it as an HTML page
try:
loader = UnstructuredURLLoader([url])
return loader.load()
except Exception as e:
print(f"Error loading HTML from {url}: {e}")
return None
except Exception as e:
print(f"General error loading from {url}: {e}")
return None
def add_to_qdrant(documents, embeddings, qdrant_client, collection_name):
Qdrant.from_documents(
documents,
embeddings,
url=qdrant_client.url,
prefer_grpc=True,
collection_name=collection_name,
)
def agent_node(state, agent, name):
result = agent.invoke(state)
return {
"messages": [HumanMessage(content=result["messages"][-1].content, name=name)]
}
def create_team_agent(llm, tools, system_prompt, agent_name, team_members):
return create_agent(
llm,
tools,
f"{system_prompt}\nBelow are files currently in your directory:\n{{current_files}}",
#team_members
)
def create_agent_node(agent, name):
return functools.partial(agent_node, agent=agent, name=name)
def add_agent_to_graph(graph, agent_name, agent_node):
graph.add_node(agent_name, agent_node)
graph.add_edge(agent_name, "supervisor")
def create_team_supervisor(llm, team_description, team_members):
return create_team_supervisor(
llm,
f"You are a supervisor tasked with managing a conversation between the"
f" following workers: {', '.join(team_members)}. {team_description}"
f" When all workers are finished, you must respond with FINISH.",
team_members
)
def enter_chain(message: str, members: List[str]):
results = {
"messages": [HumanMessage(content=message)],
"team_members": ", ".join(members),
}
return results
def create_team_chain(graph, team_members):
return (
functools.partial(enter_chain, members=team_members)
| graph.compile()
)
def create_agent(
llm: BaseLanguageModel,
tools: list,
system_prompt: str,
) -> str:
"""Create a function-calling agent and add it to the graph."""
system_prompt += ("\nWork autonomously according to your specialty, using the tools available to you."
" Do not ask for clarification."
" Your other team members (and other teams) will collaborate with you with their own specialties."
" You are chosen for a reason! You are one of the following team members: {{team_members}}.")
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
system_prompt,
),
MessagesPlaceholder(variable_name="messages"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
]
)
agent = create_openai_functions_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools)
return executor
def format_docs(docs: List[Dict]) -> str:
return "\n\n".join(f"Content: {doc.page_content}\nSource: {doc.metadata.get('source', 'Unknown')}" for doc in docs)