import os import json import bcrypt from typing import List from pathlib import Path from langchain_huggingface import HuggingFaceEmbeddings from langchain_huggingface import HuggingFaceEndpoint from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.schema import StrOutputParser from operator import itemgetter from pinecone import Pinecone from langchain_pinecone import PineconeVectorStore from langchain_community.chat_message_histories import ChatMessageHistory from langchain.memory import ConversationBufferMemory from langchain.schema.runnable import Runnable, RunnablePassthrough, RunnableConfig, RunnableLambda from langchain.callbacks.base import BaseCallbackHandler from langchain.chains import ( StuffDocumentsChain, ConversationalRetrievalChain ) import chainlit as cl from chainlit.input_widget import TextInput, Select, Switch, Slider from deep_translator import GoogleTranslator from datetime import timedelta from literalai import AsyncLiteralClient async_literal_client = AsyncLiteralClient(api_key=os.getenv("LITERAL_API_KEY")) @cl.password_auth_callback def auth_callback(username: str, password: str): auth = json.loads(os.environ['CHAINLIT_AUTH_LOGIN']) ident = next(d['ident'] for d in auth if d['ident'] == username) pwd = next(d['pwd'] for d in auth if d['ident'] == username) resultLogAdmin = bcrypt.checkpw(username.encode('utf-8'), bcrypt.hashpw(ident.encode('utf-8'), bcrypt.gensalt())) resultPwdAdmin = bcrypt.checkpw(password.encode('utf-8'), bcrypt.hashpw(pwd.encode('utf-8'), bcrypt.gensalt())) resultRole = next(d['role'] for d in auth if d['ident'] == username) if resultLogAdmin and resultPwdAdmin and resultRole == "admindatapcc": return cl.User( identifier=ident + " : 🧑💼 Admin Datapcc", metadata={"role": "admin", "provider": "credentials"} ) elif resultLogAdmin and resultPwdAdmin and resultRole == "userdatapcc": return cl.User( identifier=ident + " : 🧑🎓 User Datapcc", metadata={"role": "user", "provider": "credentials"} ) @cl.step(type="tool") async def LLModel(): os.environ['HUGGINGFACEHUB_API_TOKEN'] = os.environ['HUGGINGFACEHUB_API_TOKEN'] #repo_id = "mistralai/Mixtral-8x7B-Instruct-v0.1" repo_id = "mistralai/Mistral-7B-Instruct-v0.3" llm = HuggingFaceEndpoint( repo_id=repo_id, max_new_tokens=5300, temperature=0.5, task="text2text-generation", streaming=True ) return llm @cl.step(type="tool") async def VectorDatabase(categorie): if categorie == "bibliographie-OPP-DGDIN": index_name = "all-venus" embeddings = HuggingFaceEmbeddings() vectorstore = PineconeVectorStore( index_name=index_name, embedding=embeddings, pinecone_api_key=os.getenv('PINECONE_API_KEY') ) elif categorie == "year" or categorie == "videosTC": index_name = "all-jdlp" embeddings = HuggingFaceEmbeddings() vectorstore = PineconeVectorStore( index_name=index_name, embedding=embeddings, pinecone_api_key=os.getenv('PINECONE_API_KEYJDLP') ) elif categorie == "skills": index_name = "all-skills" embeddings = HuggingFaceEmbeddings() vectorstore = PineconeVectorStore( index_name=index_name, embedding=embeddings, pinecone_api_key=os.getenv('PINECONE_API_KEYSKILLS') ) return vectorstore @cl.step(type="retrieval") async def Retriever(categorie): vectorstore = await VectorDatabase(categorie) if categorie == "bibliographie-OPP-DGDIN": retriever = vectorstore.as_retriever(search_type="similarity_score_threshold", search_kwargs={"score_threshold": .7, "k": 150,"filter": {'categorie': {'$eq': categorie}}}) elif categorie == "year": retriever = vectorstore.as_retriever(search_type="similarity_score_threshold", search_kwargs={"score_threshold": .7, "k": 6,"filter": {'year': {'$gte': 2019}}}) elif categorie == "skills": retriever = vectorstore.as_retriever(search_type="similarity_score_threshold", search_kwargs={"score_threshold": .7, "k": 200,"filter": {'file': {'$eq': 'competences-master-CFA.csv'}}}) elif categorie == "videosTC": retriever = vectorstore.as_retriever(search_type="similarity_score_threshold", search_kwargs={"score_threshold": .7, "k": 200,"filter": {"title": {"$eq": "videos-confinement-timeline"}}}) return retriever @cl.step(type="embedding") async def Search(input, categorie): vectorstore = await VectorDatabase(categorie) results = [] test = [] sources_text = "" sources_offres = "" verbatim_text = "" count = 0 countOffres = 0 if categorie == "bibliographie-OPP-DGDIN": search = vectorstore.similarity_search(input,k=50, filter={"categorie": {"$eq": categorie}}) for i in range(0,len(search)): if search[i].metadata['Lien'] not in test: if count <= 15: count = count + 1 test.append(search[i].metadata['Lien']) sources_text = sources_text + str(count) + ". " + search[i].metadata['Titre'] + ', ' + search[i].metadata['Auteurs'] + ', ' + search[i].metadata['Lien'] + "\n" verbatim_text = verbatim_text + "
" + str(count) + ". " + search[i].metadata['Phrase'] + "
" elif categorie == "year": search = vectorstore.similarity_search(input,k=50, filter={"year": {"$gte": 2019}}) for i in range(0,len(search)): if count <= 15: count = count + 1 sources_text = sources_text + str(count) + ". " + search[i].metadata['title'] + ' (JDLP : ' + str(search[i].metadata['year']) + '), ' + search[i].metadata['author'] + ', https://cipen.univ-gustave-eiffel.fr/fileadmin/CIPEN/OPP/' + search[i].metadata['file'] + "\n" verbatim_text = verbatim_text + "
" + str(count) + ". JDLP : " + search[i].metadata['jdlp'] + "
" + search[i].page_content + "
" elif categorie == "skills": search = vectorstore.similarity_search(input,k=50, filter={"file": {"$eq": 'competences-master-CFA.csv'}}) searchOffres = vectorstore.similarity_search(input,k=50, filter={"file": {"$eq": 'marche-emploi-CFA.csv'}}) for i in range(0,len(search)): if count <= 15: count = count + 1 sources_text = sources_text + str(count) + ". " + search[i].metadata['diplôme'] + ' (année : ' + search[i].metadata['année'] + '), ' + search[i].metadata['domaine'] + ', https://www.francecompetences.fr/recherche/rncp/' + str(search[i].metadata['rncp'])[4:] + "/\n" verbatim_text = verbatim_text + "" + str(count) + ". " + search[i].metadata['diplôme'] + "
" + search[i].page_content + "
" for i in range(0,len(searchOffres)): if countOffres <= 15: countOffres = countOffres + 1 sources_offres = sources_offres + str(countOffres) + ". " + searchOffres[i].metadata['Poste'] + " (type de contrat : " + searchOffres[i].metadata['Contrat'] + ")\n" elif categorie == "videosTC": search = vectorstore.similarity_search(input,k=50, filter={"title": {"$eq": "videos-confinement-timeline"}}) for i in range(0,len(search)): if count <= 17: count = count + 1 timeSeq = search[i].metadata["time"] timeSeqRound = round(timeSeq) time = timedelta(seconds=timeSeqRound) sources_text = sources_text + '' verbatim_text = verbatim_text + "" + str(count) + ". " + search[i].metadata['titre'] + "
🕓 "+ str(time) + " : " + search[i].page_content + "
" results = [sources_text, verbatim_text, sources_offres] return results @cl.on_chat_start async def on_chat_start(): await cl.Message(f"> REVIEWSTREAM").send() res = await cl.AskActionMessage( content="Hal Archives Ouvertes : Une archive ouverte est un réservoir numérique contenant des documents issus de la recherche scientifique, généralement déposés par leurs auteurs, et permettant au grand public d'y accéder gratuitement et sans contraintes.
Persée : offre un accès libre et gratuit à des collections complètes de publications scientifiques (revues, livres, actes de colloques, publications en série, sources primaires, etc.) associé à une gamme d'outils de recherche et d'exploitation.
""" prompt_elements = [] prompt_elements.append( cl.Text(content=contentPrompts, name=listPrompts_name, display="side") ) await cl.Message(content="📚 " + listPrompts_name, elements=prompt_elements).send() settings = await cl.ChatSettings( [ Select( id="Model", label="Publications de recherche", values=["---", "HAL", "Persée"], initial_index=0, ), ] ).send() if res: await cl.Message(f"Vous pouvez requêter sur la thématique : {res.get('value')}").send() cl.user_session.set("selectRequest", res.get("name")) model = await LLModel() retriever = await Retriever(cl.user_session.get("selectRequest")) ########## Chain with streaming ########## message_history = ChatMessageHistory() memory = ConversationBufferMemory(memory_key="chat_history",output_key="answer",chat_memory=message_history,return_messages=True) qa = ConversationalRetrievalChain.from_llm( model, memory=memory, chain_type="stuff", return_source_documents=True, verbose=False, retriever=retriever ) cl.user_session.set("runnable", qa) cl.user_session.set("memory", memory) @cl.on_message async def on_message(message: cl.Message): memory = cl.user_session.get("memory") runnable = cl.user_session.get("runnable") # type: Runnable msg = cl.Message(content="") class PostMessageHandler(BaseCallbackHandler): """ Callback handler for handling the retriever and LLM processes. Used to post the sources of the retrieved documents as a Chainlit element. """ def __init__(self, msg: cl.Message): BaseCallbackHandler.__init__(self) self.msg = msg self.sources = set() # To store unique pairs def on_retriever_end(self, documents, *, run_id, parent_run_id, **kwargs): for d in documents: source_page_pair = (d.metadata['source'], d.metadata['page']) self.sources.add(source_page_pair) # Add unique pairs to the set def on_llm_end(self, response, *, run_id, parent_run_id, **kwargs): sources_text = "\n".join([f"{source}#page={page}" for source, page in self.sources]) self.msg.elements.append( cl.Text(name="Sources", content=sources_text, display="inline") ) with async_literal_client.thread() as thread: cb = cl.AsyncLangchainCallbackHandler() results = await runnable.acall("Contexte : Vous êtes un chercheur de l'enseignement supérieur et vous êtes doué pour faire des analyses d'articles de recherche sur les thématiques liées à la pédagogie, en fonction des critères définis ci-avant. En fonction des informations suivantes et du contexte suivant seulement et strictement, répondez en langue française strictement à la question ci-dessous, en 5500 mots minimum. En plus, tu créeras et tu afficheras, à la fin de ta réponse, 3 questions supplémentaires en relation avec le contexte initial, à chaque étape de la conversation. Tu écriras et tu afficheras les 3 questions supplémentaires en relation avec le contexte initial, à la fin de ta réponse, avec un titrage de niveau 1 qui a pour titre \"Questions en relation avec le contexte : \". Lorsque cela est possible, cite les sources du contexte. Si vous ne pouvez pas répondre à la question sur la base des informations, dites que vous ne trouvez pas de réponse ou que vous ne parvenez pas à trouver de réponse. Essayez donc de comprendre en profondeur le contexte et répondez uniquement en vous basant sur les informations fournies. Ne générez pas de réponses non pertinentes. Question : " + message.content, callbacks=[cb]) answer = results["answer"] await cl.Message(content=GoogleTranslator(source='auto', target='fr').translate(answer)).send() #search = vectorstore.similarity_search(message.content,k=50, filter={"categorie": {"$eq": "bibliographie-OPP-DGDIN"}}) search = await Search(message.content, cl.user_session.get("selectRequest")) #os.environ["GOOGLE_CSE_ID"] = os.getenv('GOOGLE_CSE_ID') #os.environ["GOOGLE_API_KEY"] = os.getenv('GOOGLE_API_KEY') #searchAPI = GoogleSearchAPIWrapper() #def top5_results(query): # return searchAPI.results(query, 5) #tool = Tool( # name="Google Search Snippets", # description="Search Google for recent results.", # func=top5_results, #) #query = str(message.content) #ref_text = tool.run(query) #if 'Result' not in ref_text[0].keys(): # print(ref_text) #else: # print('None') sources = [ cl.Text(name="Sources", content=search[0], display="inline") ] await cl.Message( content="Sources : ", elements=sources, ).send() if search[2]: sourcesOffres = [ cl.Text(name="Exemples d'offres d'emploi", content=search[2], display="inline") ] await cl.Message( content="Offres d'emploi : ", elements=sourcesOffres, ).send() verbatim = [ cl.Text(name="Verbatim", content=search[1], display="side") ] await cl.Message( content="📚 Liste des Verbatim ", elements=verbatim, ).send()