HindiTokenizer / utilities.py
Manu101's picture
Upload utilities.py
258e5d5 verified
import fileinput
import io
import json
import os
import pathlib
import sys
from functools import wraps
from typing import List, Union
# import google.auth
class Logger(object):
def __init__(self, filename="Default.log"):
self.terminal = sys.stdout
self.log = open(filename, "a")
def write(self, message):
self.terminal.write(message)
self.log.write(message)
def flush(self):
pass
def log_to_file(file_name="Default.log"):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Save the current stdout and stderr
original_stdout = sys.stdout
original_stderr = sys.stderr
# Redirect stdout and stderr to the log file
logger = Logger(file_name)
sys.stdout = logger
sys.stderr = logger
try:
# Call the original function
result = func(*args, **kwargs)
return result
finally:
# Reset stdout and stderr
sys.stdout = original_stdout
sys.stderr = original_stderr
return wrapper
return decorator
# doesn't work directly, need to setup Google Cloud credentials if not present
# src: https://developers.google.com/drive/api/guides/manage-downloads#download-content
# def download_file(real_file_id):
# # dataset link: https://drive.google.com/drive/folders/1KD7v4eW2ZKQ0Re_6lXRuaaVswvS3IFIh?usp=sharing
# """Downloads a file
# Args:
# real_file_id: ID of the file to download
# Returns : IO object with location.
#
# Load pre-authorized user credentials from the environment.
# TODO(developer) - See https://developers.google.com/identity
# for guides on implementing OAuth2 for the application.
# """
# creds, _ = google.auth.default()
#
# try:
# # create drive api client
# service = build("drive", "v3", credentials=creds)
#
# file_id = real_file_id
#
# # pylint: disable=maybe-no-member
# request = service.files().get_media(fileId=file_id)
# file = io.BytesIO()
# downloader = MediaIoBaseDownload(file, request)
# done = False
# while done is False:
# status, done = downloader.next_chunk()
# print(f"Download {int(status.progress() * 100)}.")
#
# except HttpError as error:
# print(f"An error occurred: {error}")
# file = None
#
# return file.getvalue()
def read_from_all_files(all_files_to_read: List[Union[str, pathlib.Path]], batch_size: int = 1000,
batch_num: int = None,
encoding: str = "utf-8",
reading_only_specific_files: List[str] = None) -> List:
"""
bas basic generator that yields a batch of lines, leverages in-built fileinput for reading all files and using same file object
:param all_files_to_read: list of file paths, str or Path
:param batch_size: the number of maximum lines to yield
:param batch_num: the number of batches to yield and then stop, added later for testing
:return: List of text lines
"""
print("\n=========\nReading dataset\n=============")
counter = 0
if reading_only_specific_files:
for idx, f_name in enumerate(all_files_to_read):
if not all(x in f_name for x in reading_only_specific_files):
all_files_to_read.pop(idx)
print(f"\nCount of files to read...{len(all_files_to_read)}")
all_files_to_read = sorted(all_files_to_read)
with fileinput.input(files=all_files_to_read,
encoding=encoding) as f: # in-built fileinput to read all files, efficient, handles things internally
batch = []
for line in f:
# print(f"file number: {f.fileno()}")
# print(f"file-line number: {f.filelineno()}")
# print(line)
if line != '\n':
batch.append(line)
if len(batch) == batch_size:
counter += 1
yield batch
if batch_num and counter == batch_num:
break
batch = []
if batch:
yield batch
print(f"\nFinal counter value: {counter}")
print("\n=========\nReading dataset done\n=============")
def read_chunks_from_file(file_path, chunk_size=4 * 1024 * 1024, encoding="utf-8"):
"""
helper function to yield chunk_size of data read from the file_path given
"""
file_path = os.path.abspath(file_path)
with open(file_path, 'r', encoding=encoding) as f:
for chunk in iter(lambda: f.read(chunk_size), b''):
yield chunk
def get_all_text_dataset(path: str | pathlib.Path, file_type=".txt") -> List:
"""
Helper function to get all .txt files' given a path or root directory, uses glob recursively to find the given format files
:param path: str or Path object, root directory for a dataset
:param file_type: format of files to get
:return: list of path of all files of the specified format
"""
files = []
# first convert json data to text and then process text
convert_json_data_to_text_and_process_text(dir_path="./web-scrapper",
file_type=".json",
output_file_path="./dataset/combined_from_crawler-json.txt")
for txt_file in pathlib.Path(path).rglob('*' + file_type):
files.append(txt_file)
return files
# def get_data_batch(all_files, chunk_size=100 * 1024 * 1024, formats=".txt"):
# for file in all_files:
# yield from read_chunks_from_file(file)
def convert_json_data_to_text_and_process_text(dir_path, file_type=".json", output_file_path="crawler_data.txt"):
"""
Helper function to convert JSON data to text and then process the text
"""
with open(output_file_path, "w", encoding="utf-8") as f_out:
for json_file in pathlib.Path(dir_path).rglob('*' + file_type):
with open(json_file, "r", encoding="utf-8") as f:
data = json.load(f)
for item in data:
f_out.write(" ".join(item["text"]) + "\n")
if __name__ == "__main__":
download_file(real_file_id="1KD7v4eW2ZKQ0Re_6lXRuaaVswvS3IFIh")