import argparse import numpy as np import pandas as pd import time from datetime import datetime, timedelta from pytz import timezone import re import json import config from data_utils.image_utils import ( load_image, resize_coordinates_and_image_to_fit_to_maximum_pixel_counts, ) import torch import os from functools import wraps import threading lock = threading.Lock() def check_gpu(): if torch.cuda.is_available(): current_device = torch.cuda.current_device() device_name = torch.cuda.get_device_name(current_device) print(f"Using GPU Device: {current_device} - {device_name}") else: print("CUDA is not available.") def record_and_save_gpu_memory_usage(func): # Add func parameter @wraps(func) def wrapper(*args, **kwargs): torch.cuda.memory._record_memory_history(enabled=True) result = func(*args, **kwargs) torch.cuda.memory._record_memory_history(enabled=False) torch.cuda.memory._save_segment_usage(filename="snapshot/segment_usage.svg") torch.cuda.memory._save_memory_usage(filename="snapshot/memory_usage.svg") return result # Ensure the result is returned return wrapper def measure_gpu_time_and_memory(func): @wraps(func) def wrapper(*args, **kwargs): cuda = kwargs.get("cuda", True) # Default to True if 'cuda' is not provided start_memory = ( torch.cuda.memory_reserved() if cuda else 0 ) # Record initial memory result = func(*args, **kwargs) end_memory = torch.cuda.memory_reserved() if cuda else 0 # Record final memory if cuda: print( f"{func.__name__} Initial CUDA memory reserved: {start_memory / (1024 ** 3):.2f} GB" ) print( f"{func.__name__} Final CUDA memory reserved: {end_memory / (1024 ** 3):.2f} GB" ) print( f"{func.__name__} CUDA memory change: {(end_memory - start_memory) / (1024 ** 3):.2f} GB" ) return result return wrapper def timeit(func): @wraps(func) def timeit_wrapper(*args, **kwargs): start_time = time.perf_counter() result = func(*args, **kwargs) end_time = time.perf_counter() total_time = end_time - start_time if kwargs.get("debug", False): print(f"{func.__name__} : {total_time:.4f} sec..") # print(f'Function {func.__name__} {args} {kwargs} Took {total_time:.4f} seconds') return result return timeit_wrapper def async_timeit(func): @wraps(func) async def timeit_wrapper(*args, **kwargs): start_time = time.perf_counter() result = await func(*args, **kwargs) end_time = time.perf_counter() total_time = end_time - start_time if kwargs.get("debug", False): print(f"{func.__name__} : {total_time:.4f} sec..") # print(f'Function {func.__name__} {args} {kwargs} Took {total_time:.4f} seconds') return result return timeit_wrapper def thread_func(func): @wraps(func) def thread_func_wrapper(*args, **kwargs): lock.acquire() result = func(*args, **kwargs) lock.release() torch.cuda.empty_cache() return result return thread_func_wrapper def get_arguments(): parser = argparse.ArgumentParser(description="text_remover") parser.add_argument("--image") parser.add_argument("--dir") parser.add_argument("--json") parser.add_argument("--refine", action="store_true", default=False) parser.add_argument("--preserve_resolution", action="store_true", default=False) parser.add_argument("--pixel_thresh", type=int) # Evaluate text stroke mask parser.add_argument("--prepare_kaist", action="store_true", default=False) parser.add_argument("--kaist_all_zip") parser.add_argument("--data_dir") args = parser.parse_args() return args def get_elapsed_time(start_time): return timedelta(seconds=round(time.time() - start_time)) def get_current_time(): return str(datetime.now(timezone("Asia/Seoul"))).replace(" ", "-").rsplit(".", 1)[0] def parse_csv_file(path_csv, resize=False): df = pd.read_csv(path_csv) ls_rows = list() for coor, content in df[["coordinates", "content"]].values: coor = re.sub(pattern=r"\(|\)", repl="", string=coor) coor = coor.split(",") rect = list(map(int, coor)) ls_rows.append((rect[2], rect[3], rect[0], rect[1], content)) bboxes = pd.DataFrame( ls_rows, columns=["xmin", "ymin", "xmax", "ymax", "transcript"] ) bboxes["area"] = bboxes.apply( lambda x: (x["xmax"] - x["xmin"]) * (x["ymax"] - x["ymin"]), axis=1 ) bboxes.sort_values(["area"], inplace=True) bboxes.drop(["area"], axis=1, inplace=True) img_url = df["image_url"].values[0] img = load_image(img_url) if resize: bboxes, img = resize_coordinates_and_image_to_fit_to_maximum_pixel_counts( ha_bboxs=bboxes, img=img ) return bboxes, img, img_url def parse_json_file(json_path): with open(json_path, mode="r") as f: req = json.load(f) img_url = req["data"]["data"]["req"]["image_url"] img = load_image(img_url) coors = req["data"]["data"]["req"]["coordinates"] bboxes = pd.DataFrame(coors, columns=["xmin", "ymin", "xmax", "ymax"]) return bboxes, img, img_url def parse_transcription_df(csv_path, index=0): df = pd.read_csv(csv_path) ls_rows = list() for idx, (img_url, df_groupby) in enumerate(df.groupby("image_url")): if idx != index: continue img = load_image(img_url) # for img_url, coor, ori_content, tr_content in df_groupby.values: for item_org_id, img_url, coor, ori_content, tr_content in df_groupby.values: coor = re.sub(pattern=r"\(|\)|\.0", repl="", string=coor) coor = coor.split(",") rect = list(map(int, coor)) # ls_rows.append((rect[2], rect[3], rect[0], rect[1], ori_content, tr_content)) ls_rows.append( ( item_org_id, rect[2], rect[3], rect[0], rect[1], ori_content, tr_content, ) ) bboxes = pd.DataFrame( # ls_rows, columns=["xmin", "ymin", "xmax", "ymax", "ori_content", "tr_content"] ls_rows, columns=[ "item_org_id", "xmin", "ymin", "xmax", "ymax", "ori_content", "tr_content", ], ) return bboxes, img, img_url if __name__ == "__main__": pass # font = ImageFont.truetype( # font="/Users/jongbeomkim/Desktop/workspace/image_processing_server/fonts/NotoSansThai-ExtraBold.ttf", # size=round(30), # )