# -*- coding:utf-8 -*- from __future__ import annotations import json import logging import os import platform import re from pathlib import Path import requests import torch from tqdm import tqdm from langchain.memory import ConversationSummaryBufferMemory class LogRecord(logging.LogRecord): def getMessage(self): msg = self.msg if self.args: if isinstance(self.args, dict): msg = msg.format(**self.args) else: msg = msg.format(*self.args) return msg class Logger(logging.Logger): def makeRecord( self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None, ): rv = LogRecord(name, level, fn, lno, msg, args, exc_info, func, sinfo) if extra is not None: for key in extra: rv.__dict__[key] = extra[key] return rv def init_settings(): logging.setLoggerClass(Logger) logging.basicConfig( level=logging.WARNING, format="%(asctime)s [%(levelname)s] [%(filename)s:%(lineno)d] %(message)s", ) def remove_extra_spaces(text): return re.sub(" +", " ", text.strip()) def print_llm_response(llm_response, debug_retrieval=True): answer = llm_response["answer"] if "answer" in llm_response else None if answer is None: answer = llm_response["response"] if "response" in llm_response else None if answer is not None: print("\n\n***Answer:") print(answer) source_documents = ( llm_response["source_documents"] if "source_documents" in llm_response else None ) if source_documents is None: source_documents = ( llm_response["sourceDocs"] if "sourceDocs" in llm_response else None ) if debug_retrieval and source_documents is not None: print("\nSources:") for index, source in enumerate(source_documents): metadata = source["metadata"] if "metadata" in source else source.metadata if "page" in metadata: print(f" Page: {metadata['page']}", end="") print( f" Source {index + 1}: " + str(metadata["url"] if "url" in metadata else metadata["source"]) ) print( source["page_content"] if "page_content" in source else source.page_content ) if "chat_history" in llm_response: print("\nChat History:") print(llm_response["chat_history"]) def get_device_types(): print("Running on: ", platform.platform()) print("MPS is", "NOT" if not torch.backends.mps.is_available() else "", "available") print("CUDA is", "NOT" if not torch.cuda.is_available() else "", "available") device_type_available = "cpu" if not torch.backends.mps.is_available(): if not torch.backends.mps.is_built(): print( "MPS not available because the current PyTorch install was not " "built with MPS enabled." ) else: print( "MPS not available because the current MacOS version is not 12.3+ " "and/or you do not have an MPS-enabled device on this machine." ) else: device_type_available = "mps" if torch.cuda.is_available(): print("CUDA is available, we have found ", torch.cuda.device_count(), " GPU(s)") print(torch.cuda.get_device_name(0)) print("CUDA version: " + torch.version.cuda) device_type_available = f"cuda:{torch.cuda.current_device()}" return ( os.environ.get("HF_EMBEDDINGS_DEVICE_TYPE") or device_type_available, os.environ.get("HF_PIPELINE_DEVICE_TYPE") or device_type_available, ) def ensure_model_is_downloaded(llm_model_type): if llm_model_type.startswith("gpt4all"): local_path = ( os.environ.get("GPT4ALL_J_MODEL_PATH") if llm_model_type == "gpt4all-j" else os.environ.get("GPT4ALL_MODEL_PATH") ) url = ( os.environ.get("GPT4ALL_J_DOWNLOAD_LINK") if llm_model_type == "gpt4all-j" else os.environ.get("GPT4ALL_DOWNLOAD_LINK") ) elif llm_model_type == "llamacpp": local_path = os.environ.get("LLAMACPP_MODEL_PATH") url = os.environ.get("LLAMACPP_DOWNLOAD_LINK") elif llm_model_type == "ctransformers": local_path = os.environ.get("CTRANSFORMERS_MODEL_PATH") url = os.environ.get("CTRANSFORMERS_DOWNLOAD_LINK") else: raise ValueError(f"wrong model typle: {llm_model_type}") path = Path(local_path) if path.is_file(): print(f"model: {local_path} exists") else: print(f"downloading model: {local_path} from {url} ...") path.parent.mkdir(parents=True, exist_ok=True) # send a GET request to the URL to download the file. Stream since it's large response = requests.get(url, stream=True) # open the file in binary mode and write the contents of the response to it in chunks # This is a large file, so be prepared to wait. with open(local_path, "wb") as f: for chunk in tqdm(response.iter_content(chunk_size=8192)): if chunk: f.write(chunk) return local_path class CustomizedConversationSummaryBufferMemory(ConversationSummaryBufferMemory): def save_context(self, inputs, outputs) -> None: for key in outputs: if isinstance(outputs[key], str): outputs[key] = outputs[key].replace("<|im_end|>", "") return super().save_context(inputs, outputs) def predict_new_summary(self, messages, existing_summary) -> str: return ( super() .predict_new_summary(messages, existing_summary) .replace("<|im_end|>", "") ) def CalculateDistance(entry1, entry2, distance_calculator): if entry1 == entry2: return 0 distance = distance_calculator.evaluate_string_pairs( prediction=entry1, prediction_b=entry2 ) # print(f"entry1: {entry1}, entry2: {entry2}, distance: {distance['score']}") return distance["score"] def FindInList(entry, elist, distance_calculator=None, debug=False): for item in elist: if distance_calculator is not None: distance = CalculateDistance(entry, item, distance_calculator) if distance < distance_threshold: if debug: print( f"FindInList - matched by distance {distance:.3f}: {entry} - {item}" ) return True if entry == item: return True return False def CalculatePRF1F2( goldAnswerList, predAnswerList, distance_calculator=None, debug=False ): if len(goldAnswerList) == 0: if len(predAnswerList) == 0: return [ 1.0, 1.0, 1.0, 1.0, ] # consider it 'correct' when there is no labeled answer, and also no predicted answer else: return [ 0.0, 1.0, 0.0, 0.0, ] # precision=0 and recall=1 when there is no labeled answer, but has some predicted answer(s) elif len(predAnswerList) == 0: return [ 1.0, 0.0, 0.0, 0.0, ] # precision=1 and recall=0 when there is labeled answer(s), but no predicted answer else: glist = goldAnswerList plist = predAnswerList tp = 1e-40 # numerical trick fp = 0.0 fn = 0.0 for gentry in glist: if FindInList( gentry, plist, distance_calculator=distance_calculator, debug=True ): tp += 1 else: fn += 1 for pentry in plist: if not FindInList(pentry, glist, distance_calculator=distance_calculator): fp += 1 precision = tp / (tp + fp) recall = tp / (tp + fn) f1 = (2 * precision * recall) / (precision + recall) f2 = (5 * precision * recall) / (4 * precision + recall) return [precision, recall, f1, f2] nlp = None distance_threshold = 0.05 def load_spacy_model(): import spacy global nlp if nlp is not None: return nlp global distance_threshold distance_threshold = float(os.getenv("DISTANCE_THRESHOLD", "0.05")) spacy_model_name = os.getenv("SPACY_MODEL_NAME", "en_core_web_trf") while True: try: print(f"loading spacy model from {spacy_model_name}") nlp = spacy.load(spacy_model_name) print(f"loaded spacy model from {spacy_model_name}") return nlp except OSError: print(f"downloading spacy model {spacy_model_name}") spacy.cli.download(spacy_model_name) print(f"downloaded spacy model {spacy_model_name}") def clean_text(text): text = text.lower() text = text.replace('"', "") text = text.replace(".", "") # text = text.replace("ō", "o") return text def get_entities_in_text(text, debug=False): nlp = load_spacy_model() doc = nlp(text) entities_in_text = [] for word in doc.ents: if debug: print(word.text, word.label_) entity = clean_text(word.text) if entity not in entities_in_text: entities_in_text.append(entity) entities_in_text.sort() return entities_in_text def calculate_metrics(question, answer, distance_calculator=None, debug=False): ground_truth = question["answers"] ground_truth.sort() if debug: print(f"question: {question}") print(f"answer: {answer}") print("entities_in_question ---------------") entities_in_question = get_entities_in_text(question["question"], debug) print("entities_in_answer -----------------") entities_in_answer = get_entities_in_text(answer, debug) print("done with NER with spaCy -----------") entities_in_answer.sort() predAnswerList = [ pentry for pentry in entities_in_answer if not FindInList(pentry, entities_in_question) ] print(f"entities_in_question: {entities_in_question}") print(f"entities_in_answer: {entities_in_answer}") print(f"ground_truth: {ground_truth}") print(f"pred_answers: {predAnswerList}") precision, recall, f1, f2 = CalculatePRF1F2( ground_truth, predAnswerList, debug=debug, distance_calculator=distance_calculator, ) print(f"precision: {precision}, recall: {recall}, f1: {f1}, f2: {f2}") else: precision = 0.0 recall = 0.0 f1 = 0.0 f2 = 0.0 entities_in_answer = [] entities_in_question = [] return ( precision, recall, f1, f2, entities_in_answer, ground_truth, entities_in_question, ) def calculate_metrics_gemini(question, answer, debug=False): precision = 0.0 recall = 0.0 f1 = 0.0 return (precision, recall, f1) if __name__ == "__main__": from langchain_community.embeddings import HuggingFaceInstructEmbeddings from langchain.evaluation import load_evaluator hf_embeddings_device_type, hf_pipeline_device_type = get_device_types() print(f"hf_embeddings_device_type: {hf_embeddings_device_type}") print(f"hf_pipeline_device_type: {hf_pipeline_device_type}") hf_embeddings_model_name = "hkunlp/instructor-large" print(f"hf_embeddings_model_name: {hf_embeddings_model_name}") embeddings = HuggingFaceInstructEmbeddings( model_name=hf_embeddings_model_name, model_kwargs={"device": hf_embeddings_device_type}, ) hf_evaluator = load_evaluator("pairwise_embedding_distance", embeddings=embeddings) question = { "question": "what does jamaican people speak", "entities_in_question": ["jamaican"], "answers": ["jamaican english", "jamaican creole english language"], } answer = "Jamaican people primarily speak Jamaican Patois, which is an English-based creole language with significant West African influences. It is spoken as a native language by the majority of Jamaicans and also exists in various forms among Jamaican expatriates and non-Jamaicans in different parts of the world. The phonology of Jamaican Patois includes around 21 consonants (with some dialectal variation regarding the status of /h/ as a phoneme) and between nine and sixteen vowels, some of which are capable of nasalization or lengthening. There are also instances of palatalization in Jamaican Patois, where certain consonants appear to be phonemic in some dialects but may be considered phonetic in others. For example, the palatal stops [c], [ɟ], and [ɲ] may be analyzed as phonemes or as instances of phonetic palatalization depending on the account." calculate_metrics(question, answer, distance_calculator=hf_evaluator, debug=True) question = { "question": "who is governor of ohio 2011", "entities_in_question": ["2011"], "answers": ["john kasich", "return j. meigs, jr.", "ted strickland"], } answer = "The lieutenant governor of Ohio in 2011 was Mary Taylor, who served alongside Governor John Kasich. She assumed office on January 10, 2011, after being elected as the lieutenant governor in the 2010 election. During her tenure, she faced criticism for using the state airplane for personal errands and reportedly had high turnover among her staff." calculate_metrics(question, answer, distance_calculator=hf_evaluator, debug=True) question = { "question": "where is the fukushima daiichi nuclear power station", "entities_in_question": ["the fukushima daiichi nuclear power station"], "answers": ["japan", "okuma"], } answer = "The Fukushima Daiichi Nuclear Power Station is located in the towns of Ōkuma and Futaba in Fukushima Prefecture, Japan." calculate_metrics(question, answer, distance_calculator=hf_evaluator, debug=True)