from __future__ import absolute_import from __future__ import division from __future__ import print_function from pathlib import Path import time import numpy as np import os import sys __dir__ = os.path.dirname(os.path.abspath(__file__)) sys.path.append(__dir__) sys.path.insert(0, os.path.abspath(os.path.join(__dir__, '..'))) os.environ['FLAGS_allocator_strategy'] = 'auto_growth' import cv2 import json import torch from tools.engine import Config from tools.utility import ArgsParser from tools.utils.ckpt import load_ckpt from tools.utils.logging import get_logger from tools.utils.utility import get_image_file_list logger = get_logger() root_dir = Path(__file__).resolve().parent DEFAULT_CFG_PATH_DET = str(root_dir / '../configs/det/dbnet/repvit_db.yml') MODEL_NAME_DET = './openocr_det_repvit_ch.pth' # 模型文件名称 DOWNLOAD_URL_DET = 'https://github.com/Topdu/OpenOCR/releases/download/develop0.0.1/openocr_det_repvit_ch.pth' # 模型文件 URL def check_and_download_model(model_name: str, url: str): """ 检查预训练模型是否存在,若不存在则从指定 URL 下载到固定缓存目录。 Args: model_name (str): 模型文件的名称,例如 "model.pt" url (str): 模型文件的下载地址 Returns: str: 模型文件的完整路径 """ if os.path.exists(model_name): return model_name # 固定缓存路径为用户主目录下的 ".cache/openocr" cache_dir = Path.home() / '.cache' / 'openocr' model_path = cache_dir / model_name # 如果模型文件已存在,直接返回路径 if model_path.exists(): logger.info(f'Model already exists at: {model_path}') return str(model_path) # 如果文件不存在,下载模型 logger.info(f'Model not found. Downloading from {url}...') # 创建缓存目录(如果不存在) cache_dir.mkdir(parents=True, exist_ok=True) try: # 下载文件 import urllib.request with urllib.request.urlopen(url) as response, open(model_path, 'wb') as out_file: out_file.write(response.read()) logger.info(f'Model downloaded and saved at: {model_path}') return str(model_path) except Exception as e: logger.error(f'Error downloading the model: {e}') # 提示用户手动下载 logger.error( f'Unable to download the model automatically. ' f'Please download the model manually from the following URL:\n{url}\n' f'and save it to: {model_name} or {model_path}') raise RuntimeError( f'Failed to download the model. Please download it manually from {url} ' f'and save it to {model_path}') from e def replace_batchnorm(net): for child_name, child in net.named_children(): if hasattr(child, 'fuse'): fused = child.fuse() setattr(net, child_name, fused) replace_batchnorm(fused) elif isinstance(child, torch.nn.BatchNorm2d): setattr(net, child_name, torch.nn.Identity()) else: replace_batchnorm(child) def padding_image(img, size=(640, 640)): """ Padding an image using OpenCV: - If the image is smaller than the target size, pad it to 640x640. - If the image is larger than the target size, split it into multiple 640x640 images and record positions. :param image_path: Path to the input image. :param output_dir: Directory to save the output images. :param size: The target size for padding or splitting (default 640x640). :return: List of tuples containing the coordinates of the top-left corner of each cropped 640x640 image. """ img_height, img_width = img.shape[:2] target_width, target_height = size # If image is smaller than target size, pad the image to 640x640 # Calculate padding amounts (top, bottom, left, right) pad_top = 0 pad_bottom = target_height - img_height pad_left = 0 pad_right = target_width - img_width # Pad the image (white padding, border type: constant) padded_img = cv2.copyMakeBorder(img, pad_top, pad_bottom, pad_left, pad_right, cv2.BORDER_CONSTANT, value=[0, 0, 0]) # Return the padded area positions (top-left and bottom-right coordinates of the original image) return padded_img def resize_image(img, size=(640, 640), over_lap=64): """ Resize an image using OpenCV: - If the image is smaller than the target size, pad it to 640x640. - If the image is larger than the target size, split it into multiple 640x640 images and record positions. :param image_path: Path to the input image. :param output_dir: Directory to save the output images. :param size: The target size for padding or splitting (default 640x640). :return: List of tuples containing the coordinates of the top-left corner of each cropped 640x640 image. """ img_height, img_width = img.shape[:2] target_width, target_height = size # If image is smaller than target size, pad the image to 640x640 if img_width <= target_width and img_height <= target_height: # Calculate padding amounts (top, bottom, left, right) if img_width == target_width and img_height == target_height: return [img], [[0, 0, img_width, img_height]] padded_img = padding_image(img, size) # Return the padded area positions (top-left and bottom-right coordinates of the original image) return [padded_img], [[0, 0, img_width, img_height]] img_height, img_width = img.shape[:2] # If image is larger than or equal to target size, crop it into 640x640 tiles crop_positions = [] count = 0 cropped_img_list = [] for top in range(0, img_height - over_lap, target_height - over_lap): for left in range(0, img_width - over_lap, target_width - over_lap): # Calculate the bottom and right boundaries for the crop right = min(left + target_width, img_width) bottom = min(top + target_height, img_height) if right >= img_width: right = img_width left = max(0, right - target_width) if bottom >= img_height: bottom = img_height top = max(0, bottom - target_height) # Crop the image cropped_img = img[top:bottom, left:right] if bottom - top < target_height or right - left < target_width: cropped_img = padding_image(cropped_img, size) count += 1 cropped_img_list.append(cropped_img) # Record the position of the cropped image crop_positions.append([left, top, right, bottom]) return cropped_img_list, crop_positions def restore_preds(preds, crop_positions, original_size): restored_pred = torch.zeros((1, 1, original_size[0], original_size[1]), dtype=preds.dtype, device=preds.device) count = 0 for cropped_pred, (left, top, right, bottom) in zip(preds, crop_positions): crop_height = bottom - top crop_width = right - left corp_vis_img = cropped_pred[:, :crop_height, :crop_width] mask = corp_vis_img > 0.3 count += 1 restored_pred[:, :, top:top + crop_height, left:left + crop_width] += mask[:, :crop_height, :crop_width].to( preds.dtype) return restored_pred def draw_det_res(dt_boxes, img, img_name, save_path): src_im = img for box in dt_boxes: box = np.array(box).astype(np.int32).reshape((-1, 1, 2)) cv2.polylines(src_im, [box], True, color=(255, 255, 0), thickness=2) if not os.path.exists(save_path): os.makedirs(save_path) save_path = os.path.join(save_path, os.path.basename(img_name)) cv2.imwrite(save_path, src_im) def set_device(device, numId=0): if device == 'gpu' and torch.cuda.is_available(): device = torch.device(f'cuda:{numId}') else: logger.info('GPU is not available, using CPU.') device = torch.device('cpu') return device class OpenDetector(object): def __init__(self, config=None, numId=0): """ 初始化函数。 Args: config (dict, optional): 配置文件,默认为None。如果为None,则使用默认配置文件。 numId (int, optional): 设备编号,默认为0。 Returns: None Raises: 无 """ if config is None: config = Config(DEFAULT_CFG_PATH_DET).cfg if not os.path.exists(config['Global']['pretrained_model']): config['Global']['pretrained_model'] = check_and_download_model( MODEL_NAME_DET, DOWNLOAD_URL_DET) from opendet.modeling import build_model as build_det_model from opendet.postprocess import build_post_process from opendet.preprocess import create_operators, transform self.transform = transform global_config = config['Global'] # build model self.model = build_det_model(config['Architecture']) self.model.eval() load_ckpt(self.model, config) replace_batchnorm(self.model.backbone) self.device = set_device(config['Global']['device'], numId=numId) self.model.to(device=self.device) # create data ops transforms = [] for op in config['Eval']['dataset']['transforms']: op_name = list(op)[0] if 'Label' in op_name: continue elif op_name == 'KeepKeys': op[op_name]['keep_keys'] = ['image', 'shape'] transforms.append(op) self.ops = create_operators(transforms, global_config) # build post process self.post_process_class = build_post_process(config['PostProcess'], global_config) def crop_infer( self, img_path=None, img_numpy_list=None, img_numpy=None, ): if img_numpy is not None: img_numpy_list = [img_numpy] num_img = 1 elif img_path is not None: num_img = len(img_path) elif img_numpy_list is not None: num_img = len(img_numpy_list) else: raise Exception('No input image path or numpy array.') results = [] for img_idx in range(num_img): if img_numpy_list is not None: img = img_numpy_list[img_idx] data = {'image': img} elif img_path is not None: with open(img_path[img_idx], 'rb') as f: img = f.read() data = {'image': img} data = self.transform(data, self.ops[:1]) src_img_ori = data['image'] img_height, img_width = src_img_ori.shape[:2] target_size = 640 over_lap = 64 if img_height > img_width: r_h = target_size * 2 - over_lap r_w = img_width * (target_size * 2 - over_lap) // img_height else: r_w = target_size * 2 - over_lap r_h = img_height * (target_size * 2 - over_lap) // img_width src_img = cv2.resize(src_img_ori, (r_w, r_h)) shape_list_ori = np.array([[ img_height, img_width, float(r_h) / img_height, float(r_w) / img_width ]]) img_height, img_width = src_img.shape[:2] cropped_img_list, crop_positions = resize_image(src_img, size=(target_size, target_size), over_lap=over_lap) image_list = [] shape_list = [] for img in cropped_img_list: batch_i = self.transform({'image': img}, self.ops[-3:-1]) image_list.append(batch_i['image']) shape_list.append([640, 640, 1, 1]) images = np.array(image_list) shape_list = np.array(shape_list) images = torch.from_numpy(images).to(device=self.device) with torch.no_grad(): t_start = time.time() preds = self.model(images) t_cost = time.time() - t_start preds['maps'] = restore_preds(preds['maps'], crop_positions, (img_height, img_width)) post_result = self.post_process_class(preds, shape_list_ori) info = {'boxes': post_result[0]['points'], 'elapse': t_cost} results.append(info) return results def __call__(self, img_path=None, img_numpy_list=None, img_numpy=None, return_mask=False, **kwargs): """ 对输入图像进行处理,并返回处理结果。 Args: img_path (str, optional): 图像文件路径。默认为 None。 img_numpy_list (list, optional): 图像数据列表,每个元素为 numpy 数组。默认为 None。 img_numpy (numpy.ndarray, optional): 图像数据,numpy 数组格式。默认为 None。 Returns: list: 包含处理结果的列表。每个元素为一个字典,包含 'boxes' 和 'elapse' 两个键。 'boxes' 的值为检测到的目标框点集,'elapse' 的值为处理时间。 Raises: Exception: 若没有提供图像路径或 numpy 数组,则抛出异常。 """ if img_numpy is not None: img_numpy_list = [img_numpy] num_img = 1 elif img_path is not None: img_path = get_image_file_list(img_path) num_img = len(img_path) elif img_numpy_list is not None: num_img = len(img_numpy_list) else: raise Exception('No input image path or numpy array.') results = [] for img_idx in range(num_img): if img_numpy_list is not None: img = img_numpy_list[img_idx] data = {'image': img} elif img_path is not None: with open(img_path[img_idx], 'rb') as f: img = f.read() data = {'image': img} data = self.transform(data, self.ops[:1]) if kwargs.get('det_input_size', None) is not None: data['max_sile_len'] = kwargs['det_input_size'] batch = self.transform(data, self.ops[1:]) images = np.expand_dims(batch[0], axis=0) shape_list = np.expand_dims(batch[1], axis=0) images = torch.from_numpy(images).to(device=self.device) with torch.no_grad(): t_start = time.time() preds = self.model(images) t_cost = time.time() - t_start post_result = self.post_process_class(preds, shape_list, **kwargs) info = {'boxes': post_result[0]['points'], 'elapse': t_cost} if return_mask: if isinstance(preds['maps'], torch.Tensor): mask = preds['maps'].detach().cpu().numpy() else: mask = preds['maps'] info['mask'] = mask results.append(info) return results @torch.no_grad() def main(cfg): is_visualize = cfg['Global'].get('is_visualize', False) model = OpenDetector(cfg) save_res_path = './det_results/' if not os.path.exists(save_res_path): os.makedirs(save_res_path) sample_num = 0 with open(save_res_path + '/det_results.txt', 'wb') as fout: for file in get_image_file_list(cfg['Global']['infer_img']): preds_result = model(img_path=file)[0] logger.info('{} infer_img: {}, time cost: {}'.format( sample_num, file, preds_result['elapse'])) boxes = preds_result['boxes'] dt_boxes_json = [] for box in boxes: tmp_json = {} tmp_json['points'] = np.array(box).tolist() dt_boxes_json.append(tmp_json) if is_visualize: src_img = cv2.imread(file) draw_det_res(boxes, src_img, file, save_res_path) logger.info('The detected Image saved in {}'.format( os.path.join(save_res_path, os.path.basename(file)))) otstr = file + '\t' + json.dumps(dt_boxes_json) + '\n' logger.info('results: {}'.format(json.dumps(dt_boxes_json))) fout.write(otstr.encode()) sample_num += 1 logger.info( f"Results saved to {os.path.join(save_res_path, 'det_results.txt')}.)" ) logger.info('success!') if __name__ == '__main__': FLAGS = ArgsParser().parse_args() cfg = Config(FLAGS.config) FLAGS = vars(FLAGS) opt = FLAGS.pop('opt') cfg.merge_dict(FLAGS) cfg.merge_dict(opt) main(cfg.cfg)