# -*- coding: utf-8 -*- import os from PIL import Image import numpy as np import json Image.MAX_IMAGE_PIXELS = None from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm max_pixels=2048*2048 max_long_size=4096 def has_alpha(img:Image.Image): for band in img.getbands(): if band in {'A','a','P'}: return True return False def add_white_background(img:Image.Image)->Image.Image: img=img.convert('RGBA') #转换为RGBA background = Image.new('RGBA', img.size, (255, 255, 255)) img = Image.alpha_composite(background, img) return img def resize_image(image:Image.Image)->Image.Image: width, height = image.size max_side = max(width, height) current_pixels=width*height # 检查是否需要调整大小 if max_side > max_long_size or current_pixels>max_pixels: # 计算缩放比例 scale = min((max_long_size / max_side), ((max_pixels / current_pixels) ** 0.5)) # 计算新的尺寸 new_width = int(width * scale) new_height = int(height * scale) # 调整图片大小 resized_image = image.resize((new_width, new_height), Image.BICUBIC ) return resized_image # 如果不需要调整大小,返回原始图片 return image def load_image(image_path:str)->Image.Image: try: with Image.open(image_path) as img: img.load()#读取图片加载到内存 np.array(img) #尝试用numpy加载图片 img=resize_image(img) #resize图片 if has_alpha(img): #读取并移除透明图层 img=add_white_background(img) #添加白色背景 if not img.mode == "RGB": img = img.convert("RGB") return img except: return None def get_image_metainfo(img): if img is None: return None else: width, height = img.size return {'width':width, 'height':height, 'pixel_num':width*height, } def process_image(input_image_path:str,output_image_path:str): img=load_image(input_image_path) image_metainfo=get_image_metainfo(img) output_image_json_path=output_image_path.replace(".webp",".json") if img is not None and image_metainfo is not None: img.save(output_image_path,"WEBP",quality=90) #保存图像 with open(output_image_json_path,'w') as f: #保存metainfo json.dump(image_metainfo,f,indent=4) def get_image_paths(input_dir, output_dir): for root, _, files in os.walk(input_dir): for file in files: if file.lower().endswith(('.png', '.jpg', '.jpeg', '.webp', '.bmp')): input_path = os.path.join(root, file) rel_path = os.path.relpath(input_path, input_dir) output_path = os.path.join(output_dir, os.path.splitext(rel_path)[0] + '.webp') os.makedirs(os.path.dirname(output_path), exist_ok=True) yield input_path, output_path def process_images_with_thread_pool(input_image_dir:str, output_image_dir:str, num_threads=16): os.makedirs(output_image_dir, exist_ok=True) image_paths = get_image_paths(input_image_dir, output_image_dir) with ThreadPoolExecutor(max_workers=num_threads) as executor: # 创建任务列表 futures = [] for input_path, output_path in image_paths: futures.append(executor.submit(process_image, input_path, output_path)) for _ in tqdm( executor.map(lambda f: f.result(), futures), total=len(futures), desc="Processing images"): pass if __name__ == "__main__": # process_image( # input_image_path="test.png", # output_image_path='test.webp') process_images_with_thread_pool(input_image_dir=r"20240808\unsplash-research-dataset-lite-latest\test", output_image_dir=r"20240808\unsplash-research-dataset-lite-latest\output", num_threads=16)