import os import csv import shutil import openai import pandas as pd import numpy as np from transformers import GPT2TokenizerFast from dotenv import load_dotenv import time # Heavily derived from OpenAi's cookbook example load_dotenv() # the dir is the ./playground directory REPOSITORY_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "playground") class Embeddings: def __init__(self, workspace_path: str): self.workspace_path = workspace_path openai.api_key = os.getenv("OPENAI_API_KEY", "") self.DOC_EMBEDDINGS_MODEL = f"text-embedding-ada-002" self.QUERY_EMBEDDINGS_MODEL = f"text-embedding-ada-002" self.SEPARATOR = "\n* " self.tokenizer = GPT2TokenizerFast.from_pretrained("gpt2") self.separator_len = len(self.tokenizer.tokenize(self.SEPARATOR)) def compute_repository_embeddings(self): try: playground_data_path = os.path.join(self.workspace_path, 'playground_data') # Delete the contents of the playground_data directory but not the directory itself # This is to ensure that we don't have any old data lying around for filename in os.listdir(playground_data_path): file_path = os.path.join(playground_data_path, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: print(f"Failed to delete {file_path}. Reason: {str(e)}") except Exception as e: print(f"Error: {str(e)}") # extract and save info to csv info = self.extract_info(REPOSITORY_PATH) self.save_info_to_csv(info) df = pd.read_csv(os.path.join(self.workspace_path, 'playground_data\\repository_info.csv')) df = df.set_index(["filePath", "lineCoverage"]) self.df = df context_embeddings = self.compute_doc_embeddings(df) self.save_doc_embeddings_to_csv(context_embeddings, df, os.path.join(self.workspace_path, 'playground_data\\doc_embeddings.csv')) try: self.document_embeddings = self.load_embeddings(os.path.join(self.workspace_path, 'playground_data\\doc_embeddings.csv')) except: pass # Extract information from files in the repository in chunks # Return a list of [filePath, lineCoverage, chunkContent] def extract_info(self, REPOSITORY_PATH): # Initialize an empty list to store the information info = [] LINES_PER_CHUNK = 60 # Iterate through the files in the repository for root, dirs, files in os.walk(REPOSITORY_PATH): for file in files: file_path = os.path.join(root, file) # Read the contents of the file with open(file_path, "r", encoding="utf-8") as f: try: contents = f.read() except: continue # Split the contents into lines lines = contents.split("\n") # Ignore empty lines lines = [line for line in lines if line.strip()] # Split the lines into chunks of LINES_PER_CHUNK lines chunks = [ lines[i:i+LINES_PER_CHUNK] for i in range(0, len(lines), LINES_PER_CHUNK) ] # Iterate through the chunks for i, chunk in enumerate(chunks): # Join the lines in the chunk back into a single string chunk = "\n".join(chunk) # Get the first and last line numbers first_line = i * LINES_PER_CHUNK + 1 last_line = first_line + len(chunk.split("\n")) - 1 line_coverage = (first_line, last_line) # Add the file path, line coverage, and content to the list info.append((os.path.join(root, file), line_coverage, chunk)) # Return the list of information return info def save_info_to_csv(self, info): # Open a CSV file for writing os.makedirs(os.path.join(self.workspace_path, "playground_data"), exist_ok=True) with open(os.path.join(self.workspace_path, 'playground_data\\repository_info.csv'), "w", newline="") as csvfile: # Create a CSV writer writer = csv.writer(csvfile) # Write the header row writer.writerow(["filePath", "lineCoverage", "content"]) # Iterate through the info for file_path, line_coverage, content in info: # Write a row for each chunk of data writer.writerow([file_path, line_coverage, content]) def get_relevant_code_chunks(self, task_description: str, task_context: str): query = task_description + "\n" + task_context most_relevant_document_sections = self.order_document_sections_by_query_similarity(query, self.document_embeddings) selected_chunks = [] for _, section_index in most_relevant_document_sections: try: document_section = self.df.loc[section_index] selected_chunks.append(self.SEPARATOR + document_section['content'].replace("\n", " ")) if len(selected_chunks) >= 2: break except: pass return selected_chunks def get_embedding(self, text: str, model: str) -> list[float]: result = openai.Embedding.create( model=model, input=text ) return result["data"][0]["embedding"] def get_doc_embedding(self, text: str) -> list[float]: return self.get_embedding(text, self.DOC_EMBEDDINGS_MODEL) def get_query_embedding(self, text: str) -> list[float]: return self.get_embedding(text, self.QUERY_EMBEDDINGS_MODEL) def compute_doc_embeddings(self, df: pd.DataFrame) -> dict[tuple[str, str], list[float]]: """ Create an embedding for each row in the dataframe using the OpenAI Embeddings API. Return a dictionary that maps between each embedding vector and the index of the row that it corresponds to. """ embeddings = {} for idx, r in df.iterrows(): # Wait one second before making the next call to the OpenAI Embeddings API # print("Waiting one second before embedding next row\n") time.sleep(1) embeddings[idx] = self.get_doc_embedding(r.content.replace("\n", " ")) return embeddings def save_doc_embeddings_to_csv(self, doc_embeddings: dict, df: pd.DataFrame, csv_filepath: str): # Get the dimensionality of the embedding vectors from the first element in the doc_embeddings dictionary if len(doc_embeddings) == 0: return EMBEDDING_DIM = len(list(doc_embeddings.values())[0]) # Create a new dataframe with the filePath, lineCoverage, and embedding vector columns embeddings_df = pd.DataFrame(columns=["filePath", "lineCoverage"] + [f"{i}" for i in range(EMBEDDING_DIM)]) # Iterate over the rows in the original dataframe for idx, _ in df.iterrows(): # Get the embedding vector for the current row embedding = doc_embeddings[idx] # Create a new row in the embeddings dataframe with the filePath, lineCoverage, and embedding vector values row = [idx[0], idx[1]] + embedding embeddings_df.loc[len(embeddings_df)] = row # Save the embeddings dataframe to a CSV file embeddings_df.to_csv(csv_filepath, index=False) def vector_similarity(self, x: list[float], y: list[float]) -> float: return np.dot(np.array(x), np.array(y)) def order_document_sections_by_query_similarity(self, query: str, contexts: dict[(str, str), np.array]) -> list[(float, (str, str))]: """ Find the query embedding for the supplied query, and compare it against all of the pre-calculated document embeddings to find the most relevant sections. Return the list of document sections, sorted by relevance in descending order. """ query_embedding = self.get_query_embedding(query) document_similarities = sorted([ (self.vector_similarity(query_embedding, doc_embedding), doc_index) for doc_index, doc_embedding in contexts.items() ], reverse=True) return document_similarities def load_embeddings(self, fname: str) -> dict[tuple[str, str], list[float]]: df = pd.read_csv(fname, header=0) max_dim = max([int(c) for c in df.columns if c != "filePath" and c != "lineCoverage"]) return { (r.filePath, r.lineCoverage): [r[str(i)] for i in range(max_dim + 1)] for _, r in df.iterrows() }