Spaces:
Running
Running
import os | |
import json | |
import bcrypt | |
from typing import List | |
from pathlib import Path | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain_huggingface import HuggingFaceEndpoint | |
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain.schema import StrOutputParser | |
from operator import itemgetter | |
from pinecone import Pinecone | |
from langchain_pinecone import PineconeVectorStore | |
from langchain_community.chat_message_histories import ChatMessageHistory | |
from langchain.memory import ConversationBufferMemory | |
from langchain.schema.runnable import Runnable, RunnablePassthrough, RunnableConfig, RunnableLambda | |
from langchain.callbacks.base import BaseCallbackHandler | |
from langchain.chains import ( | |
StuffDocumentsChain, ConversationalRetrievalChain | |
) | |
import chainlit as cl | |
from chainlit.input_widget import TextInput, Select, Switch, Slider | |
from deep_translator import GoogleTranslator | |
from datetime import timedelta | |
from literalai import AsyncLiteralClient | |
async_literal_client = AsyncLiteralClient(api_key=os.getenv("LITERAL_API_KEY")) | |
def auth_callback(username: str, password: str): | |
auth = json.loads(os.environ['CHAINLIT_AUTH_LOGIN']) | |
ident = next(d['ident'] for d in auth if d['ident'] == username) | |
pwd = next(d['pwd'] for d in auth if d['ident'] == username) | |
resultLogAdmin = bcrypt.checkpw(username.encode('utf-8'), bcrypt.hashpw(ident.encode('utf-8'), bcrypt.gensalt())) | |
resultPwdAdmin = bcrypt.checkpw(password.encode('utf-8'), bcrypt.hashpw(pwd.encode('utf-8'), bcrypt.gensalt())) | |
resultRole = next(d['role'] for d in auth if d['ident'] == username) | |
if resultLogAdmin and resultPwdAdmin and resultRole == "admindatapcc": | |
return cl.User( | |
identifier=ident + " : 🧑💼 Admin Datapcc", metadata={"role": "admin", "provider": "credentials"} | |
) | |
elif resultLogAdmin and resultPwdAdmin and resultRole == "userdatapcc": | |
return cl.User( | |
identifier=ident + " : 🧑🎓 User Datapcc", metadata={"role": "user", "provider": "credentials"} | |
) | |
async def LLModel(): | |
os.environ['HUGGINGFACEHUB_API_TOKEN'] = os.environ['HUGGINGFACEHUB_API_TOKEN'] | |
#repo_id = "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
repo_id = "mistralai/Mistral-7B-Instruct-v0.3" | |
llm = HuggingFaceEndpoint( | |
repo_id=repo_id, max_new_tokens=5300, temperature=0.5, task="text2text-generation", streaming=True | |
) | |
return llm | |
async def VectorDatabase(categorie): | |
if categorie == "bibliographie-OPP-DGDIN": | |
index_name = "all-venus" | |
embeddings = HuggingFaceEmbeddings() | |
vectorstore = PineconeVectorStore( | |
index_name=index_name, embedding=embeddings, pinecone_api_key=os.getenv('PINECONE_API_KEY') | |
) | |
elif categorie == "year" or categorie == "videosTC": | |
index_name = "all-jdlp" | |
embeddings = HuggingFaceEmbeddings() | |
vectorstore = PineconeVectorStore( | |
index_name=index_name, embedding=embeddings, pinecone_api_key=os.getenv('PINECONE_API_KEYJDLP') | |
) | |
elif categorie == "skills": | |
index_name = "all-skills" | |
embeddings = HuggingFaceEmbeddings() | |
vectorstore = PineconeVectorStore( | |
index_name=index_name, embedding=embeddings, pinecone_api_key=os.getenv('PINECONE_API_KEYSKILLS') | |
) | |
return vectorstore | |
async def Retriever(categorie): | |
vectorstore = await VectorDatabase(categorie) | |
if categorie == "bibliographie-OPP-DGDIN": | |
retriever = vectorstore.as_retriever(search_type="similarity_score_threshold", search_kwargs={"score_threshold": .7, "k": 150,"filter": {'categorie': {'$eq': categorie}}}) | |
elif categorie == "year": | |
retriever = vectorstore.as_retriever(search_type="similarity_score_threshold", search_kwargs={"score_threshold": .7, "k": 6,"filter": {'year': {'$gte': 2019}}}) | |
elif categorie == "skills": | |
retriever = vectorstore.as_retriever(search_type="similarity_score_threshold", search_kwargs={"score_threshold": .7, "k": 200,"filter": {'file': {'$eq': 'competences-master-CFA.csv'}}}) | |
elif categorie == "videosTC": | |
retriever = vectorstore.as_retriever(search_type="similarity_score_threshold", search_kwargs={"score_threshold": .7, "k": 200,"filter": {"title": {"$eq": "videos-confinement-timeline"}}}) | |
return retriever | |
async def Search(input, categorie): | |
vectorstore = await VectorDatabase(categorie) | |
results = [] | |
test = [] | |
sources_text = "" | |
sources_offres = "" | |
verbatim_text = "" | |
count = 0 | |
countOffres = 0 | |
if categorie == "bibliographie-OPP-DGDIN": | |
search = vectorstore.similarity_search(input,k=50, filter={"categorie": {"$eq": categorie}}) | |
for i in range(0,len(search)): | |
if search[i].metadata['Lien'] not in test: | |
if count <= 15: | |
count = count + 1 | |
test.append(search[i].metadata['Lien']) | |
sources_text = sources_text + str(count) + ". " + search[i].metadata['Titre'] + ', ' + search[i].metadata['Auteurs'] + ', ' + search[i].metadata['Lien'] + "\n" | |
verbatim_text = verbatim_text + "<p style='font-size:0.8rem'>" + str(count) + ". " + search[i].metadata['Phrase'] + "</p><p> </p>" | |
elif categorie == "year": | |
search = vectorstore.similarity_search(input,k=50, filter={"year": {"$gte": 2019}}) | |
for i in range(0,len(search)): | |
if count <= 15: | |
count = count + 1 | |
sources_text = sources_text + str(count) + ". " + search[i].metadata['title'] + ' (JDLP : ' + str(search[i].metadata['year']) + '), ' + search[i].metadata['author'] + ', https://cipen.univ-gustave-eiffel.fr/fileadmin/CIPEN/OPP/' + search[i].metadata['file'] + "\n" | |
verbatim_text = verbatim_text + "<p style='font-size:0.8rem'>" + str(count) + ". JDLP : " + search[i].metadata['jdlp'] + "</p><p style='font-size:0.8rem'>" + search[i].page_content + "</p>" | |
elif categorie == "skills": | |
search = vectorstore.similarity_search(input,k=50, filter={"file": {"$eq": 'competences-master-CFA.csv'}}) | |
searchOffres = vectorstore.similarity_search(input,k=50, filter={"file": {"$eq": 'marche-emploi-CFA.csv'}}) | |
for i in range(0,len(search)): | |
if count <= 15: | |
count = count + 1 | |
sources_text = sources_text + str(count) + ". " + search[i].metadata['diplôme'] + ' (année : ' + search[i].metadata['année'] + '), ' + search[i].metadata['domaine'] + ', https://www.francecompetences.fr/recherche/rncp/' + str(search[i].metadata['rncp'])[4:] + "/\n" | |
verbatim_text = verbatim_text + "<p style='font-size:0.8rem'>" + str(count) + ". " + search[i].metadata['diplôme'] + "</p><p style='font-size:0.8rem'>" + search[i].page_content + "</p>" | |
for i in range(0,len(searchOffres)): | |
if countOffres <= 15: | |
countOffres = countOffres + 1 | |
sources_offres = sources_offres + str(countOffres) + ". " + searchOffres[i].metadata['Poste'] + " (type de contrat : " + searchOffres[i].metadata['Contrat'] + ")\n" | |
elif categorie == "videosTC": | |
search = vectorstore.similarity_search(input,k=50, filter={"title": {"$eq": "videos-confinement-timeline"}}) | |
for i in range(0,len(search)): | |
if count <= 17: | |
count = count + 1 | |
timeSeq = search[i].metadata["time"] | |
timeSeqRound = round(timeSeq) | |
time = timedelta(seconds=timeSeqRound) | |
sources_text = sources_text + '<div class="gridvid"><a target="_blank" title="' + search[i].metadata['titre'] + ' : ...' + search[i].page_content + '" href="' + search[i].metadata['video'] + '#start=' + str(timeSeq) + '"><img src="' + search[i].metadata['image'] + '" width="100%" alt="' + search[i].metadata['titre'] + ' : ...' + search[i].page_content + '"/><p>🕓 ' + str(time) + ' : ...' + search[i].page_content + ' : ' + search[i].metadata['titre'] + '</p></a></div>' | |
verbatim_text = verbatim_text + "<p style='font-size:0.8rem'>" + str(count) + ". " + search[i].metadata['titre'] + "</p><p style='font-size:0.8rem'>🕓 "+ str(time) + " : " + search[i].page_content + "</p>" | |
results = [sources_text, verbatim_text, sources_offres] | |
return results | |
async def on_chat_start(): | |
await cl.Message(f"> REVIEWSTREAM").send() | |
res = await cl.AskActionMessage( | |
content="<div style='width:100%;text-align:center'>Sélectionnez une source documentaire</div>", | |
actions=[ | |
cl.Action(name="bibliographie-OPP-DGDIN", value="Pédagogie durable", label="🔥 Pédagogie durable : exemple : «quels sont les modèles d'apprentissage dans les universités?»"), | |
cl.Action(name="bibliographie-OPP-DGDIN", value="Lieux d'apprentissage", label="🏫 Lieux d'apprentissage : exemple : «donne des exemples de lieu d'apprentissage dans les universités?»"), | |
cl.Action(name="bibliographie-OPP-DGDIN", value="Espaces d'apprentissage", label="🏠 Espaces d'apprentissage : exemple : «donne des exemples d'agencement d'espaces d'apprentissage dans les universités?»"), | |
cl.Action(name="year", value="Journée de La Pédagogie", label="👨🏫 Journée de La Pédagogie : exemple : «Quelles sont les bonnes pratiques des plateformes de e-learning?»"), | |
cl.Action(name="skills", value="Compétences du CFA Descartes", label="🧑🏻🎓 Les compétences des masters du CFA Descartes : exemple : «Quels sont les Master qui dispensent des compétences en marketing dispensées au CFA?»"), | |
cl.Action(name="OF", value="Formations Gustave Eiffel", label="🎓 Les formations de l'université Gustave Eiffel : exemple : «Quels sont les formations et les métiers possibles si on est créatif?»"), | |
cl.Action(name="videosTC", value="Vidéos paroles de confiné.es", label="📽️ Les vidéos paroles de confiné.es : exemple : «Quelle est la méthodologie employée avec les plateformes d'enseignement à distance?»"), | |
cl.Action(name="offreST", value="Offres d'emploi France Travail", label="💼 Les offres d'emploi de France Travail : exemple : «Quels sont les types de contrat proposés par les recruteurs?»"), | |
cl.Action(name="coderome", value="Catalogue Codes ROME", label="📖 Le catalogue ROME : exemple : «Quels sont les codes ROME liés au marketing?»"), | |
cl.Action(name="vision", value="Caption image and object definition", label="🖼️ Transcription et définition d'objets image : exemple : «Définis les objets et donne une analyse de l'image?»"), | |
cl.Action(name="pdfextraction", value="Lecture de document PDF", label="📃 Extraction de contenus dans document PDF : exemple : «Résume le contenu du document?»"), | |
], | |
timeout="3600" | |
).send() | |
listPrompts_name = f"Liste des revues de recherche" | |
contentPrompts = """<p><img src='/public/hal-logo-header.png' width='32' align='absmiddle' /> <strong> Hal Archives Ouvertes</strong> : Une archive ouverte est un réservoir numérique contenant des documents issus de la recherche scientifique, généralement déposés par leurs auteurs, et permettant au grand public d'y accéder gratuitement et sans contraintes. | |
</p> | |
<p><img src='/public/logo-persee.png' width='32' align='absmiddle' /> <strong>Persée</strong> : offre un accès libre et gratuit à des collections complètes de publications scientifiques (revues, livres, actes de colloques, publications en série, sources primaires, etc.) associé à une gamme d'outils de recherche et d'exploitation.</p> | |
""" | |
prompt_elements = [] | |
prompt_elements.append( | |
cl.Text(content=contentPrompts, name=listPrompts_name, display="side") | |
) | |
await cl.Message(content="📚 " + listPrompts_name, elements=prompt_elements).send() | |
settings = await cl.ChatSettings( | |
[ | |
Select( | |
id="Model", | |
label="Publications de recherche", | |
values=["---", "HAL", "Persée"], | |
initial_index=0, | |
), | |
] | |
).send() | |
if res: | |
await cl.Message(f"Vous pouvez requêter sur la thématique : {res.get('value')}").send() | |
cl.user_session.set("selectRequest", res.get("name")) | |
model = await LLModel() | |
retriever = await Retriever(cl.user_session.get("selectRequest")) | |
########## Chain with streaming ########## | |
message_history = ChatMessageHistory() | |
memory = ConversationBufferMemory(memory_key="chat_history",output_key="answer",chat_memory=message_history,return_messages=True) | |
qa = ConversationalRetrievalChain.from_llm( | |
model, | |
memory=memory, | |
chain_type="stuff", | |
return_source_documents=True, | |
verbose=False, | |
retriever=retriever | |
) | |
cl.user_session.set("runnable", qa) | |
cl.user_session.set("memory", memory) | |
async def on_message(message: cl.Message): | |
memory = cl.user_session.get("memory") | |
runnable = cl.user_session.get("runnable") # type: Runnable | |
msg = cl.Message(content="") | |
class PostMessageHandler(BaseCallbackHandler): | |
""" | |
Callback handler for handling the retriever and LLM processes. | |
Used to post the sources of the retrieved documents as a Chainlit element. | |
""" | |
def __init__(self, msg: cl.Message): | |
BaseCallbackHandler.__init__(self) | |
self.msg = msg | |
self.sources = set() # To store unique pairs | |
def on_retriever_end(self, documents, *, run_id, parent_run_id, **kwargs): | |
for d in documents: | |
source_page_pair = (d.metadata['source'], d.metadata['page']) | |
self.sources.add(source_page_pair) # Add unique pairs to the set | |
def on_llm_end(self, response, *, run_id, parent_run_id, **kwargs): | |
sources_text = "\n".join([f"{source}#page={page}" for source, page in self.sources]) | |
self.msg.elements.append( | |
cl.Text(name="Sources", content=sources_text, display="inline") | |
) | |
with async_literal_client.thread() as thread: | |
cb = cl.AsyncLangchainCallbackHandler() | |
results = await runnable.acall("Contexte : Vous êtes un chercheur de l'enseignement supérieur et vous êtes doué pour faire des analyses d'articles de recherche sur les thématiques liées à la pédagogie, en fonction des critères définis ci-avant. En fonction des informations suivantes et du contexte suivant seulement et strictement, répondez en langue française strictement à la question ci-dessous, en 5500 mots minimum. En plus, tu créeras et tu afficheras, à la fin de ta réponse, 3 questions supplémentaires en relation avec le contexte initial, à chaque étape de la conversation. Tu écriras et tu afficheras les 3 questions supplémentaires en relation avec le contexte initial, à la fin de ta réponse, avec un titrage de niveau 1 qui a pour titre \"Questions en relation avec le contexte : \". Lorsque cela est possible, cite les sources du contexte. Si vous ne pouvez pas répondre à la question sur la base des informations, dites que vous ne trouvez pas de réponse ou que vous ne parvenez pas à trouver de réponse. Essayez donc de comprendre en profondeur le contexte et répondez uniquement en vous basant sur les informations fournies. Ne générez pas de réponses non pertinentes. Question : " + message.content, callbacks=[cb]) | |
answer = results["answer"] | |
await cl.Message(content=GoogleTranslator(source='auto', target='fr').translate(answer)).send() | |
#search = vectorstore.similarity_search(message.content,k=50, filter={"categorie": {"$eq": "bibliographie-OPP-DGDIN"}}) | |
search = await Search(message.content, cl.user_session.get("selectRequest")) | |
#os.environ["GOOGLE_CSE_ID"] = os.getenv('GOOGLE_CSE_ID') | |
#os.environ["GOOGLE_API_KEY"] = os.getenv('GOOGLE_API_KEY') | |
#searchAPI = GoogleSearchAPIWrapper() | |
#def top5_results(query): | |
# return searchAPI.results(query, 5) | |
#tool = Tool( | |
# name="Google Search Snippets", | |
# description="Search Google for recent results.", | |
# func=top5_results, | |
#) | |
#query = str(message.content) | |
#ref_text = tool.run(query) | |
#if 'Result' not in ref_text[0].keys(): | |
# print(ref_text) | |
#else: | |
# print('None') | |
sources = [ | |
cl.Text(name="Sources", content=search[0], display="inline") | |
] | |
await cl.Message( | |
content="Sources : ", | |
elements=sources, | |
).send() | |
if search[2]: | |
sourcesOffres = [ | |
cl.Text(name="Exemples d'offres d'emploi", content=search[2], display="inline") | |
] | |
await cl.Message( | |
content="Offres d'emploi : ", | |
elements=sourcesOffres, | |
).send() | |
verbatim = [ | |
cl.Text(name="Verbatim", content=search[1], display="side") | |
] | |
await cl.Message( | |
content="📚 Liste des Verbatim ", | |
elements=verbatim, | |
).send() |