# , get_pinecone_vectorstore, find_similar_vectors from climateqa.engine.vectorstore import build_vectores_stores, get_PDF_Names_from_GCP, get_categories_files from climateqa.engine.text_retriever import ClimateQARetriever from climateqa.engine.rag import make_rag_chain from climateqa.engine.llm import get_llm from utils import create_user_id from datetime import datetime import json import re import gradio as gr from sentence_transformers import CrossEncoder reranker = CrossEncoder("mixedbread-ai/mxbai-rerank-xsmall-v1") # Load environment variables in local mode try: from dotenv import load_dotenv load_dotenv() except Exception as e: pass # Set up Gradio Theme theme = gr.themes.Soft( primary_hue="yellow", secondary_hue="orange", font=[gr.themes.GoogleFont("Poppins"), "ui-sans-serif", "system-ui", "sans-serif"], ) init_prompt = "" system_template = { "role": "system", "content": init_prompt, } user_id = create_user_id() list_categorie = get_categories_files() categories=list_categorie["AllCat"] def parse_output_llm_with_sources(output): # Split the content into a list of text and "[Doc X]" references content_parts = re.split(r'\[(Doc\s?\d+(?:,\s?Doc\s?\d+)*)\]', output) parts = [] for part in content_parts: if part.startswith("Doc"): subparts = part.split(",") subparts = [subpart.lower().replace("doc", "").strip() for subpart in subparts] subparts = [f"""({ subpart})""" for subpart in subparts] parts.append("".join(subparts)) else: parts.append(part) content_parts = "".join(parts) return content_parts def serialize_docs(docs): new_docs = [] for doc in docs: new_doc = {} new_doc["page_content"] = doc.page_content new_doc["metadata"] = doc.metadata new_docs.append(new_doc) return new_docs # Create vectorstore and retriever vectorstore = build_vectores_stores("./sources") llm = get_llm(provider="openai", max_tokens=1024, temperature=0.0) async def chat(query, history, categories, src_nb_max, src_pertinence): """taking a query and a message history, use a pipeline (reformulation, retriever, answering) to yield a tuple of: (messages in gradio format, messages in langchain format, source documents)""" print(f">> NEW QUESTION : {query} -> sources max:{src_nb_max} - pertience: {src_pertinence}") filter = None if len(categories): filter={ "$or" : [] } for cat in categories: for fich in list_categorie[cat]: filter["$or"].append({"ax_name": fich}) print( ">> Filter :" + str(filter) ) print( ">> nb sources :" + str(src_nb_max) ) print( ">> pertinence :" + str(src_pertinence) ) retriever = ClimateQARetriever( vectorstore=vectorstore, sources=["Custom"], reports=[], threshold=src_pertinence, k_total=src_nb_max, filter=filter ) rag_chain = make_rag_chain(retriever, llm) inputs = {"query": query, "audience": None} result = rag_chain.astream_log(inputs) path_reformulation = "/logs/reformulation/final_output" path_keywords = "/logs/keywords/final_output" path_retriever = "/logs/find_documents/final_output" path_answer = "/logs/answer/streamed_output_str/-" docs_html = "" output_query = "" output_language = "" output_keywords = "" gallery = [] try: async for op in result: op = op.ops[0] if op['path'] == path_reformulation: # reforulated question try: output_language = op['value']["language"] # str output_query = op["value"]["question"] except Exception as e: raise gr.Error(f"ClimateQ&A Error: {e} - The error has been noted, try another question and if the error remains, you can contact us :)") if op["path"] == path_keywords: try: output_keywords = op['value']["keywords"] # str output_keywords = " AND ".join(output_keywords) except Exception as e: pass elif op['path'] == path_retriever: # documents try: docs = op['value']['docs'] # List[Document] docs_html = [] for i, d in enumerate(docs, 1): docs_html.append(make_html_source(d, i)) docs_html = "".join(docs_html) except TypeError: print("No documents found") print("op: ", op) continue elif op['path'] == path_answer: # final answer new_token = op['value'] # str # time.sleep(0.01) previous_answer = history[-1][1] previous_answer = previous_answer if previous_answer is not None else "" answer_yet = previous_answer + new_token answer_yet = parse_output_llm_with_sources(answer_yet) history[-1] = (query, answer_yet) else: continue history = [tuple(x) for x in history] yield history, docs_html, output_query, output_language, gallery, output_query, output_keywords except Exception as e: raise gr.Error(f"{e}") timestamp = str(datetime.now().timestamp()) log_file = "logs/" + timestamp + ".json" prompt = history[-1][0] logs = { "user_id": str(user_id), "prompt": prompt, "query": prompt, "question": output_query, "sources": ["Custom"], "docs": serialize_docs(docs), "answer": history[-1][1], "time": timestamp, } #log_locally(log_file, logs) yield history, docs_html, output_query, output_language, gallery, output_query, output_keywords def make_html_source(source, i): # Prépare le contenu HTML pour un fichier texte text_content = source.page_content.strip() meta = source.metadata # Nom de la source name = f"Document {i}" # Contenu HTML de la carte card = f"""
Sources
") # slider = gr.Slider(1, 10, value=src_nb_max, step=1, label="nb max", interactive=True, elem_id="source-nb-max") # slider_p = gr.Slider(0.0, 1.0, value=src_pertinence, step=0.01, label="pertinence", interactive=True, elem_id="source-pertinence") # sources_textbox = gr.HTML( # show_label=False, elem_id="sources-textbox") # docs_textbox = gr.State("") # l'object tabs est necessaire actuellement # J'ai l'impression qu'il est utiliser pour freezre les contenu des tabs # pendant que l'ia gènère une reponse .. with gr.Tabs() as tabs: # None with gr.Tab("sources"): sources_textbox = gr.HTML( show_label=False, elem_id="sources-textbox") docs_textbox = gr.State("") with gr.Tab("filtres"): cat_sel = gr.CheckboxGroup(categories,label="Catégories") slider = gr.Slider(1, 10, value=7, step=1, label="nb max", interactive=True, elem_id="source-nb-max") slider_p = gr.Slider(0.0, 1.0, value=0.5, step=0.01, label="pertinence", interactive=True, elem_id="source-pertinence") # TAB A propos with gr.Tab("À propos", elem_classes="max-height other-tabs"): with gr.Row(): with gr.Column(scale=1): gr.Markdown( ("CLARA (Climate LLM for Adaptation & Risks Answers) by [Axionable](https://www.axionable.com/)" "– Fork de [ClimateQ&A](https://huggingface.co/spaces/Ekimetrics/climate-question-answering/tree/main)"), elem_classes="a-propos") # # TAB Configuration # with gr.Tab("Configuration"): # # with gr.Row(elem_id="config-row"): # with gr.Column(scale=1): # # for pdfName in get_PDF_Names_from_GCP(): # gr.Markdown( pdfName, elem_classes="a-propos") def start_chat(query, history): history = history + [(query, None)] history = [tuple(x) for x in history] return (gr.update(interactive=False), gr.update(selected=1), history) def finish_chat(): return (gr.update(interactive=True, value="")) (textbox .submit(start_chat, [textbox, chatbot], [textbox, tabs, chatbot], queue=False, api_name="start_chat_textbox") .then(chat, [textbox, chatbot, cat_sel, slider, slider_p], [chatbot, sources_textbox], concurrency_limit=8, api_name="chat_textbox") .then(finish_chat, None, [textbox], api_name="finish_chat_textbox") ) demo.queue() demo.launch(allowed_paths=["assets/download.png", "assets/logo4.png", "assets/axionable.svg"],favicon_path="assets/logo4.png")