|
import pandas as pd |
|
from datasets import Dataset |
|
from transformers import pipeline, GPT2Tokenizer |
|
from sentence_transformers import SentenceTransformer, util |
|
|
|
|
|
filename = "output_topic_details.txt" |
|
retrieval_model_name = 'output/sentence-transformer-finetuned/' |
|
gpt2_model_name = "gpt2" |
|
csv_file_path = "train_dataset.csv" |
|
output_csv_file_path = "updated_train_dataset.csv" |
|
val_csv_file_path = "val_dataset.csv" |
|
output_val_csv_file_path = "updated_val_csv.csv" |
|
|
|
tokenizer = GPT2Tokenizer.from_pretrained(gpt2_model_name) |
|
|
|
|
|
try: |
|
retrieval_model = SentenceTransformer(retrieval_model_name) |
|
gpt_model = pipeline("text-generation", model=gpt2_model_name) |
|
print("Models loaded successfully.") |
|
except Exception as e: |
|
print(f"Failed to load models: {e}") |
|
|
|
def load_and_preprocess_text(filename): |
|
""" |
|
Load and preprocess text data from a file. |
|
|
|
Parameters: |
|
- filename (str): Path to the text file. |
|
|
|
Returns: |
|
- list[str]: A list of preprocessed text segments. |
|
""" |
|
try: |
|
with open(filename, 'r', encoding='utf-8') as file: |
|
segments = [line.strip() for line in file if line.strip()] |
|
print("Text loaded and preprocessed successfully.") |
|
return segments |
|
except Exception as e: |
|
print(f"Failed to load or preprocess text: {e}") |
|
return [] |
|
|
|
segments = load_and_preprocess_text(filename) |
|
|
|
def find_relevant_segment(user_query, segments): |
|
""" |
|
Find the most relevant text segment based on a user query. |
|
|
|
Parameters: |
|
- user_query (str): The user's query. |
|
- segments (list[str]): List of text segments to search within. |
|
|
|
Returns: |
|
- str: The most relevant text segment. |
|
""" |
|
try: |
|
query_embedding = retrieval_model.encode(user_query) |
|
segment_embeddings = retrieval_model.encode(segments) |
|
similarities = util.pytorch_cos_sim(query_embedding, segment_embeddings)[0] |
|
best_idx = similarities.argmax() |
|
return segments[best_idx] |
|
except Exception as e: |
|
print(f"Error finding relevant segment: {e}") |
|
return "" |
|
|
|
def generate_response(question): |
|
""" |
|
Generate a response to a given question by finding a relevant text segment and |
|
using it to generate a more complete answer. |
|
|
|
Parameters: |
|
- question (str): The user's question. |
|
|
|
Returns: |
|
- str: Generated response. |
|
""" |
|
relevant_segment = find_relevant_segment(question, segments) |
|
return generate_response_with_context(question, relevant_segment) |
|
|
|
def generate_response_with_context(user_query, relevant_segment): |
|
""" |
|
Generate a response based on a user query and a relevant segment. |
|
|
|
Parameters: |
|
- user_query (str): The user's query. |
|
- relevant_segment (str): A relevant fact or detail. |
|
|
|
Returns: |
|
- str: Formatted response incorporating the relevant segment. |
|
""" |
|
try: |
|
prompt = f"Thank you for your question! Here is an additional fact about your topic: {relevant_segment}" |
|
max_tokens = len(tokenizer(prompt)['input_ids']) + 50 |
|
response = gpt_model(prompt, max_length=max_tokens, temperature=0.25)[0]['generated_text'] |
|
return clean_up_response(response, relevant_segment) |
|
except Exception as e: |
|
print(f"Error generating response: {e}") |
|
return "" |
|
|
|
def clean_up_response(response, segment): |
|
""" |
|
Clean up the generated response to ensure it is tidy and presentable. |
|
|
|
Parameters: |
|
- response (str): The initial response generated by the model. |
|
- segment (str): The segment used to generate the response. |
|
|
|
Returns: |
|
- str: A cleaned and formatted response. |
|
""" |
|
sentences = response.split('.') |
|
cleaned_sentences = [sentence.strip() for sentence in sentences if sentence.strip() and sentence.strip() not in segment] |
|
cleaned_response = '. '.join(cleaned_sentences).strip() |
|
if cleaned_response and not cleaned_response.endswith((".", "!", "?")): |
|
cleaned_response += "." |
|
return cleaned_response |
|
|
|
def process_dataset(csv_file_path, output_csv_file_path): |
|
""" |
|
Process the dataset by generating responses and evaluating their similarities. |
|
|
|
Parameters: |
|
- csv_file_path (str): Path to the CSV file containing the dataset. |
|
- output_csv_file_path (str): Path where the updated dataset will be saved. |
|
|
|
Prints: |
|
- Path to the saved results and the average similarity score. |
|
""" |
|
df = pd.read_csv(csv_file_path) |
|
dataset = Dataset.from_pandas(df) |
|
updated_dataset = add_model_answers(dataset) |
|
similarities = evaluate_similarity(updated_dataset) |
|
updated_dataset = updated_dataset.add_column("similarity", similarities) |
|
results_df = updated_dataset.to_pandas() |
|
results_df.to_csv(output_csv_file_path, index=False) |
|
average_similarity = sum(similarities) / len(similarities) if similarities else 0 |
|
print(f"Results saved to {output_csv_file_path}") |
|
print(f"Average Similarity Score: {average_similarity:.3f}") |
|
|
|
def add_model_answers(dataset): |
|
""" |
|
Add generated answers to the dataset. |
|
|
|
Parameters: |
|
- dataset (datasets.Dataset): The Hugging Face dataset object. |
|
|
|
Returns: |
|
- datasets.Dataset: Updated dataset with added answers. |
|
""" |
|
answers = [generate_response(q) for q in dataset['Question']] |
|
dataset = dataset.add_column("Answer", answers) |
|
return dataset |
|
|
|
def evaluate_similarity(dataset): |
|
""" |
|
Evaluate the similarity of generated answers against ground truth answers. |
|
|
|
Parameters: |
|
- dataset (datasets.Dataset): The dataset containing both answers and ground truths. |
|
|
|
Returns: |
|
- list[float]: List of similarity scores. |
|
""" |
|
similarities = [util.pytorch_cos_sim(retrieval_model.encode(ans), retrieval_model.encode(gt))[0][0].item() |
|
for ans, gt in zip(dataset['Answer'], dataset['GroundTruth'])] |
|
return similarities |
|
|
|
|
|
process_dataset(csv_file_path, output_csv_file_path) |
|
process_dataset(val_csv_file_path, output_val_csv_file_path) |
|
|