# pylint: disable=missing-timeout from typing import List, Dict, Any, Tuple, Optional import os import logging from time import sleep import requests import streamlit as st API_ENDPOINT = os.getenv("API_ENDPOINT", "http://localhost:8000") STATUS = "initialized" HS_VERSION = "hs_version" DOC_REQUEST = "query" DOC_FEEDBACK = "feedback" DOC_UPLOAD = "file-upload" def haystack_is_ready(): """ Used to show the "Haystack is loading..." message """ url = f"{API_ENDPOINT}/{STATUS}" try: if requests.get(url).status_code < 400: return True except Exception as e: logging.exception(e) sleep(1) # To avoid spamming a non-existing endpoint at startup return False def haystack_version(): """ Get the Haystack version from the REST API """ url = f"{API_ENDPOINT}/{HS_VERSION}" return requests.get(url, timeout=0.1).json()["hs_version"] def query(query, filters={}, top_k_reader=5, top_k_retriever=5) -> Tuple[List[Dict[str, Any]], Dict[str, str]]: """ Send a query to the REST API and parse the answer. Returns both a ready-to-use representation of the results and the raw JSON. """ url = f"{API_ENDPOINT}/{DOC_REQUEST}" params = {"filters": filters, "Retriever": {"top_k": top_k_retriever}, "Reader": {"top_k": top_k_reader}} req = {"query": query, "params": params} response_raw = requests.post(url, json=req) if response_raw.status_code >= 400 and response_raw.status_code != 503: raise Exception(f"{vars(response_raw)}") response = response_raw.json() if "errors" in response: raise Exception(", ".join(response["errors"])) # Format response results = [] answers = response["answers"] for answer in answers: if answer.get("answer", None): results.append( { "context": "..." + answer["context"] + "...", "answer": answer.get("answer", None), "source": answer["meta"]["name"], "relevance": round(answer["score"] * 100, 2), "document": [doc for doc in response["documents"] if doc["id"] in answer["document_ids"]][0], "offset_start_in_doc": answer["offsets_in_document"][0]["start"], "_raw": answer, } ) else: results.append( { "context": None, "answer": None, "document": None, "relevance": round(answer["score"] * 100, 2), "_raw": answer, } ) return results, response def send_feedback(query, answer_obj, is_correct_answer, is_correct_document, document) -> None: """ Send a feedback (label) to the REST API """ url = f"{API_ENDPOINT}/{DOC_FEEDBACK}" req = { "query": query, "document": document, "is_correct_answer": is_correct_answer, "is_correct_document": is_correct_document, "origin": "user-feedback", "answer": answer_obj, } response_raw = requests.post(url, json=req) if response_raw.status_code >= 400: raise ValueError(f"An error was returned [code {response_raw.status_code}]: {response_raw.json()}") def upload_doc(file): url = f"{API_ENDPOINT}/{DOC_UPLOAD}" files = [("files", file)] response = requests.post(url, files=files).json() return response def get_backlink(result) -> Tuple[Optional[str], Optional[str]]: if result.get("document", None): doc = result["document"] if isinstance(doc, dict): if doc.get("meta", None): if isinstance(doc["meta"], dict): if doc["meta"].get("url", None) and doc["meta"].get("title", None): return doc["meta"]["url"], doc["meta"]["title"] return None, None