from typing import Dict, List from langchain.agents import AgentExecutor, create_openai_functions_agent from langchain_community.document_loaders import PyMuPDFLoader, TextLoader, UnstructuredURLLoader, WebBaseLoader from langchain_community.vectorstores import Qdrant from langchain_core.messages import AIMessage, BaseMessage, HumanMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.language_models import BaseLanguageModel import os import functools import requests import tempfile from chainlit.types import AskFileResponse def process_file(uploaded_file: AskFileResponse): if uploaded_file.name.endswith(".pdf"): with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".pdf") as temp_file: temp_file_path = temp_file.name with open(temp_file_path, "wb") as f: f.write(uploaded_file.content) # Load PDF with PyMuPDFLoader loader = PyMuPDFLoader(temp_file_path) elif uploaded_file.name.endswith(".txt"): with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as temp_file: temp_file_path = temp_file.name with open(temp_file_path, "wb") as f: f.write(uploaded_file.content) # Load text file with TextLoader loader = TextLoader(temp_file_path) else: raise ValueError("Unsupported file format. Only PDF and TXT are supported.") return loader.load() def load_documents_from_url(url): try: # Check if it's a PDF if url.endswith(".pdf"): try: loader = PyMuPDFLoader(url) return loader.load() except Exception as e: print(f"Error loading PDF from {url}: {e}") return None # Fetch the content and check for video pages try: response = requests.head(url, timeout=10) # Timeout for fetching headers content_type = response.headers.get('Content-Type', '') except Exception as e: print(f"Error fetching headers from {url}: {e}") return None # Ignore video content (flagged for now) if 'video' in content_type: return None if 'youtube' in url: return None # Otherwise, treat it as an HTML page try: loader = UnstructuredURLLoader([url]) return loader.load() except Exception as e: print(f"Error loading HTML from {url}: {e}") return None except Exception as e: print(f"General error loading from {url}: {e}") return None def add_to_qdrant(documents, embeddings, qdrant_client, collection_name): Qdrant.from_documents( documents, embeddings, url=qdrant_client.url, prefer_grpc=True, collection_name=collection_name, ) def agent_node(state, agent, name): result = agent.invoke(state) return { "messages": [HumanMessage(content=result["messages"][-1].content, name=name)] } def create_team_agent(llm, tools, system_prompt, agent_name, team_members): return create_agent( llm, tools, f"{system_prompt}\nBelow are files currently in your directory:\n{{current_files}}", #team_members ) def create_agent_node(agent, name): return functools.partial(agent_node, agent=agent, name=name) def add_agent_to_graph(graph, agent_name, agent_node): graph.add_node(agent_name, agent_node) graph.add_edge(agent_name, "supervisor") def create_team_supervisor(llm, team_description, team_members): return create_team_supervisor( llm, f"You are a supervisor tasked with managing a conversation between the" f" following workers: {', '.join(team_members)}. {team_description}" f" When all workers are finished, you must respond with FINISH.", team_members ) def enter_chain(message: str, members: List[str]): results = { "messages": [HumanMessage(content=message)], "team_members": ", ".join(members), } return results def create_team_chain(graph, team_members): return ( functools.partial(enter_chain, members=team_members) | graph.compile() ) def create_agent( llm: BaseLanguageModel, tools: list, system_prompt: str, ) -> str: """Create a function-calling agent and add it to the graph.""" system_prompt += ("\nWork autonomously according to your specialty, using the tools available to you." " Do not ask for clarification." " Your other team members (and other teams) will collaborate with you with their own specialties." " You are chosen for a reason! You are one of the following team members: {{team_members}}.") prompt = ChatPromptTemplate.from_messages( [ ( "system", system_prompt, ), MessagesPlaceholder(variable_name="messages"), MessagesPlaceholder(variable_name="agent_scratchpad"), ] ) agent = create_openai_functions_agent(llm, tools, prompt) executor = AgentExecutor(agent=agent, tools=tools) return executor def format_docs(docs: List[Dict]) -> str: return "\n\n".join(f"Content: {doc.page_content}\nSource: {doc.metadata.get('source', 'Unknown')}" for doc in docs)