Spaces:
Running
Running
import fileinput | |
import io | |
import json | |
import os | |
import pathlib | |
import sys | |
from functools import wraps | |
from typing import List, Union | |
# import google.auth | |
class Logger(object): | |
def __init__(self, filename="Default.log"): | |
self.terminal = sys.stdout | |
self.log = open(filename, "a") | |
def write(self, message): | |
self.terminal.write(message) | |
self.log.write(message) | |
def flush(self): | |
pass | |
def log_to_file(file_name="Default.log"): | |
def decorator(func): | |
def wrapper(*args, **kwargs): | |
# Save the current stdout and stderr | |
original_stdout = sys.stdout | |
original_stderr = sys.stderr | |
# Redirect stdout and stderr to the log file | |
logger = Logger(file_name) | |
sys.stdout = logger | |
sys.stderr = logger | |
try: | |
# Call the original function | |
result = func(*args, **kwargs) | |
return result | |
finally: | |
# Reset stdout and stderr | |
sys.stdout = original_stdout | |
sys.stderr = original_stderr | |
return wrapper | |
return decorator | |
# doesn't work directly, need to setup Google Cloud credentials if not present | |
# src: https://developers.google.com/drive/api/guides/manage-downloads#download-content | |
# def download_file(real_file_id): | |
# # dataset link: https://drive.google.com/drive/folders/1KD7v4eW2ZKQ0Re_6lXRuaaVswvS3IFIh?usp=sharing | |
# """Downloads a file | |
# Args: | |
# real_file_id: ID of the file to download | |
# Returns : IO object with location. | |
# | |
# Load pre-authorized user credentials from the environment. | |
# TODO(developer) - See https://developers.google.com/identity | |
# for guides on implementing OAuth2 for the application. | |
# """ | |
# creds, _ = google.auth.default() | |
# | |
# try: | |
# # create drive api client | |
# service = build("drive", "v3", credentials=creds) | |
# | |
# file_id = real_file_id | |
# | |
# # pylint: disable=maybe-no-member | |
# request = service.files().get_media(fileId=file_id) | |
# file = io.BytesIO() | |
# downloader = MediaIoBaseDownload(file, request) | |
# done = False | |
# while done is False: | |
# status, done = downloader.next_chunk() | |
# print(f"Download {int(status.progress() * 100)}.") | |
# | |
# except HttpError as error: | |
# print(f"An error occurred: {error}") | |
# file = None | |
# | |
# return file.getvalue() | |
def read_from_all_files(all_files_to_read: List[Union[str, pathlib.Path]], batch_size: int = 1000, | |
batch_num: int = None, | |
encoding: str = "utf-8", | |
reading_only_specific_files: List[str] = None) -> List: | |
""" | |
bas basic generator that yields a batch of lines, leverages in-built fileinput for reading all files and using same file object | |
:param all_files_to_read: list of file paths, str or Path | |
:param batch_size: the number of maximum lines to yield | |
:param batch_num: the number of batches to yield and then stop, added later for testing | |
:return: List of text lines | |
""" | |
print("\n=========\nReading dataset\n=============") | |
counter = 0 | |
if reading_only_specific_files: | |
for idx, f_name in enumerate(all_files_to_read): | |
if not all(x in f_name for x in reading_only_specific_files): | |
all_files_to_read.pop(idx) | |
print(f"\nCount of files to read...{len(all_files_to_read)}") | |
all_files_to_read = sorted(all_files_to_read) | |
with fileinput.input(files=all_files_to_read, | |
encoding=encoding) as f: # in-built fileinput to read all files, efficient, handles things internally | |
batch = [] | |
for line in f: | |
# print(f"file number: {f.fileno()}") | |
# print(f"file-line number: {f.filelineno()}") | |
# print(line) | |
if line != '\n': | |
batch.append(line) | |
if len(batch) == batch_size: | |
counter += 1 | |
yield batch | |
if batch_num and counter == batch_num: | |
break | |
batch = [] | |
if batch: | |
yield batch | |
print(f"\nFinal counter value: {counter}") | |
print("\n=========\nReading dataset done\n=============") | |
def read_chunks_from_file(file_path, chunk_size=4 * 1024 * 1024, encoding="utf-8"): | |
""" | |
helper function to yield chunk_size of data read from the file_path given | |
""" | |
file_path = os.path.abspath(file_path) | |
with open(file_path, 'r', encoding=encoding) as f: | |
for chunk in iter(lambda: f.read(chunk_size), b''): | |
yield chunk | |
def get_all_text_dataset(path: str | pathlib.Path, file_type=".txt") -> List: | |
""" | |
Helper function to get all .txt files' given a path or root directory, uses glob recursively to find the given format files | |
:param path: str or Path object, root directory for a dataset | |
:param file_type: format of files to get | |
:return: list of path of all files of the specified format | |
""" | |
files = [] | |
# first convert json data to text and then process text | |
convert_json_data_to_text_and_process_text(dir_path="./web-scrapper", | |
file_type=".json", | |
output_file_path="./dataset/combined_from_crawler-json.txt") | |
for txt_file in pathlib.Path(path).rglob('*' + file_type): | |
files.append(txt_file) | |
return files | |
# def get_data_batch(all_files, chunk_size=100 * 1024 * 1024, formats=".txt"): | |
# for file in all_files: | |
# yield from read_chunks_from_file(file) | |
def convert_json_data_to_text_and_process_text(dir_path, file_type=".json", output_file_path="crawler_data.txt"): | |
""" | |
Helper function to convert JSON data to text and then process the text | |
""" | |
with open(output_file_path, "w", encoding="utf-8") as f_out: | |
for json_file in pathlib.Path(dir_path).rglob('*' + file_type): | |
with open(json_file, "r", encoding="utf-8") as f: | |
data = json.load(f) | |
for item in data: | |
f_out.write(" ".join(item["text"]) + "\n") | |
if __name__ == "__main__": | |
download_file(real_file_id="1KD7v4eW2ZKQ0Re_6lXRuaaVswvS3IFIh") | |