Spaces:
Sleeping
Sleeping
""" | |
This is the template for implementing the rankers for your search engine. | |
You will be implementing WordCountCosineSimilarity, DirichletLM, TF-IDF, BM25, Pivoted Normalization, and your own ranker. | |
""" | |
from collections import Counter, defaultdict | |
from indexing import InvertedIndex | |
import math | |
class Ranker: | |
""" | |
The ranker class is responsible for generating a list of documents for a given query, ordered by their scores | |
using a particular relevance function (e.g., BM25). | |
A Ranker can be configured with any RelevanceScorer. | |
""" | |
# TODO: Implement this class properly; this is responsible for returning a list of sorted relevant documents | |
def __init__(self, index: InvertedIndex, document_preprocessor, stopwords: set[str], scorer: 'RelevanceScorer') -> None: | |
""" | |
Initializes the state of the Ranker object. | |
TODO (HW3): Previous homeworks had you passing the class of the scorer to this function | |
This has been changed as it created a lot of confusion. | |
You should now pass an instantiated RelevanceScorer to this function. | |
Args: | |
index: An inverted index | |
document_preprocessor: The DocumentPreprocessor to use for turning strings into tokens | |
stopwords: The set of stopwords to use or None if no stopword filtering is to be done | |
scorer: The RelevanceScorer object | |
""" | |
self.index = index | |
self.tokenize = document_preprocessor.tokenize | |
self.scorer = scorer | |
self.stopwords = stopwords | |
def query(self, query: str) -> list[tuple[int, float]]: | |
""" | |
Searches the collection for relevant documents to the query and | |
returns a list of documents ordered by their relevance (most relevant first). | |
Args: | |
query: The query to search for | |
Returns: | |
A list of dictionary objects with keys "docid" and "score" where docid is | |
a particular document in the collection and score is that document's relevance | |
TODO (HW3): We are standardizing the query output of Ranker to match with L2RRanker.query and VectorRanker.query | |
The query function should return a sorted list of tuples where each tuple has the first element as the document ID | |
and the second element as the score of the document after the ranking process. | |
""" | |
# TODO: Tokenize the query and remove stopwords, if needed | |
tokens = self.tokenize(query) | |
query_parts = [token for token in tokens if token not in self.stopwords] if self.stopwords else tokens | |
# TODO: Fetch a list of possible documents from the index and create a mapping from | |
# a document ID to a dictionary of the counts of the query terms in that document. | |
# You will pass the dictionary to the RelevanceScorer as input. | |
doc_word_counts = defaultdict(Counter) | |
query_word_counts = Counter(query_parts) | |
for term in query_word_counts: | |
postings = self.index.get_postings(term) | |
for posting in postings: | |
doc_word_counts[posting[0]][term] = posting[1] | |
# TODO: Rank the documents using a RelevanceScorer (like BM25 from below classes) | |
results = [] | |
for docid in doc_word_counts: | |
res = self.scorer.score(docid, doc_word_counts[docid], query_word_counts) | |
if res: | |
results.append((docid, res)) | |
# TODO: Return the **sorted** results as format [{docid: 100, score:0.5}, {{docid: 10, score:0.2}}] | |
results.sort(key=lambda x: x[1], reverse=True) | |
return results | |
class RelevanceScorer: | |
""" | |
This is the base interface for all the relevance scoring algorithm. | |
It will take a document and attempt to assign a score to it. | |
""" | |
# TODO: Implement the functions in the child classes (WordCountCosineSimilarity, DirichletLM, BM25, | |
# PivotedNormalization, TF_IDF) and not in this one | |
def __init__(self, index: InvertedIndex, parameters) -> None: | |
raise NotImplementedError | |
def score(self, docid: int, doc_word_counts: dict[str, int], query_word_counts: dict[str, int]) -> float: | |
""" | |
Returns a score for how relevance is the document for the provided query. | |
Args: | |
docid: The ID of the document | |
doc_word_counts: A dictionary containing all words in the document and their frequencies. | |
Words that have been filtered will be None. | |
query_word_counts: A dictionary containing all words in the query and their frequencies. | |
Words that have been filtered will be None. | |
Returns: | |
A score for how relevant the document is (Higher scores are more relevant.) | |
""" | |
raise NotImplementedError | |
# TODO (HW1): Implement unnormalized cosine similarity on word count vectors | |
class WordCountCosineSimilarity(RelevanceScorer): | |
def __init__(self, index: InvertedIndex, parameters={}) -> None: | |
self.index = index | |
self.parameters = parameters | |
def score(self, docid: int, doc_word_counts: dict[str, int], query_word_counts: dict[str, int]) -> float: | |
# 1. Find the dot product of the word count vector of the document and the word count vector of the query | |
# 2. Return the score | |
cwq = query_word_counts | |
score = 0 | |
flag = 0 | |
for word in cwq: | |
if word in doc_word_counts: | |
flag = 1 | |
score += cwq[word] * doc_word_counts[word] | |
if not flag: | |
return None | |
return score | |
# TODO (HW1): Implement DirichletLM | |
class DirichletLM(RelevanceScorer): | |
def __init__(self, index: InvertedIndex, parameters={'mu': 2000}) -> None: | |
self.index = index | |
self.parameters = parameters | |
self.mu = parameters['mu'] | |
def score(self, docid: int, doc_word_counts: dict[str, int], query_word_counts: dict[str, int]) -> float: | |
# 1. Get necessary information from index | |
# 2. Compute additional terms to use in algorithm | |
# 3. For all query_parts, compute score | |
# 4. Return the score | |
cwq = query_word_counts | |
q_len = sum(cwq.values()) | |
flag = 0 | |
score = 0 | |
for term in cwq: | |
if term in doc_word_counts and docid in self.index.document_metadata: | |
flag = 1 | |
pwc = self.index.get_term_metadata(term)['count']/self.index.statistics['total_token_count'] | |
first_part = cwq[term]*math.log(1+doc_word_counts[term]/(self.mu*pwc)) | |
score+=first_part | |
if docid in self.index.document_metadata: | |
second_part = q_len*math.log(self.mu/(self.mu+self.index.document_metadata[docid]['length'])) | |
score+=second_part | |
if not flag: | |
return None | |
return score | |
# TODO (HW1): Implement BM25 | |
class BM25(RelevanceScorer): | |
def __init__(self, index: InvertedIndex, parameters={'b': 0.75, 'k1': 1.2, 'k3': 8}) -> None: | |
self.index = index | |
self.b = parameters['b'] | |
self.k1 = parameters['k1'] | |
self.k3 = parameters['k3'] | |
def score(self, docid: int, doc_word_counts: dict[str, int], query_word_counts: dict[str, int])-> float: | |
# 1. Get necessary information from index | |
# 2. Find the dot product of the word count vector of the document and the word count vector of the query | |
# 3. For all query parts, compute the TF and IDF to get a score | |
# 4. Return score | |
cwq = query_word_counts | |
info = self.index.statistics # statistics | |
avg_dl = info['mean_document_length'] | |
N = info['number_of_documents'] | |
score = 0 | |
flag = 0 | |
for term in cwq: | |
if term in doc_word_counts and docid in self.index.document_metadata: | |
flag = 1 | |
third_part = cwq[term]*(self.k3+1)/(self.k3+cwq[term]) | |
first_part = math.log((N+0.5-self.index.get_term_metadata(term)['document_count'])\ | |
/(self.index.get_term_metadata(term)['document_count']+0.5)) | |
ctd = doc_word_counts[term] | |
second_part = ((self.k1+1)*ctd)\ | |
/(self.k1*(1-self.b+self.b*self.index.document_metadata[docid]['length']/avg_dl)+ctd) | |
score+=first_part*second_part*third_part | |
if not flag: | |
return None | |
return score | |
# TODO (HW1): Implement Pivoted Normalization | |
class PivotedNormalization(RelevanceScorer): | |
def __init__(self, index: InvertedIndex, parameters={'b': 0.2}) -> None: | |
self.index = index | |
self.b = parameters['b'] | |
def score(self, docid: int, doc_word_counts: dict[str, int], query_word_counts: dict[str, int]) -> float: | |
# 1. Get necessary information from index | |
# 2. Compute additional terms to use in algorithm | |
# 3. For all query parts, compute the TF, IDF, and QTF values to get a score | |
# 4. Return the score | |
cwq = query_word_counts | |
info = self.index.statistics # statistics | |
avg_dl = info['mean_document_length'] | |
N = info['number_of_documents'] | |
score = 0 | |
flag = 0 | |
for term in cwq: | |
if term in doc_word_counts and docid in self.index.document_metadata: | |
flag = 1 | |
first_part = cwq[term] | |
third_part = math.log((N+1)/self.index.get_term_metadata(term)['document_count']) | |
second_part = (1+math.log(1+math.log(doc_word_counts[term])))\ | |
/(1-self.b+self.b*self.index.document_metadata[docid]['length']/avg_dl) | |
# print(first_part, second_part, third_part) | |
score+=first_part*second_part*third_part | |
if not flag: | |
return None | |
return score | |
# TODO (HW1): Implement TF-IDF | |
class TF_IDF(RelevanceScorer): | |
def __init__(self, index: InvertedIndex, parameters={}) -> None: | |
self.index = index | |
self.parameters = parameters | |
def score(self, docid: int, doc_word_counts: dict[str, int], query_word_counts: dict[str, int]) -> float: | |
# 1. Get necessary information from index | |
# 2. Compute additional terms to use in algorithm | |
# 3. For all query parts, compute the TF, IDF, and QTF values to get a score | |
# 4. Return the score | |
cwq = query_word_counts | |
doc_total = self.index.statistics['number_of_documents'] # statistics | |
score = 0 | |
flag = 0 | |
for term in cwq: | |
if term in doc_word_counts: | |
flag = 1 | |
score += math.log(doc_word_counts[term]+1)*\ | |
(1+math.log(doc_total/(self.index.get_term_metadata(term)['document_count'])))*cwq[term] | |
if not flag: | |
return None | |
return score | |