|
--- |
|
license: mit |
|
--- |
|
|
|
Model convert from [https://github.com/KichangKim/DeepDanbooru](https://github.com/KichangKim/DeepDanbooru) |
|
|
|
## Usage: |
|
|
|
### Basic use |
|
|
|
```python |
|
import cv2 |
|
import numpy as np |
|
import onnxruntime as rt |
|
from huggingface_hub import hf_hub_download |
|
|
|
tagger_model_path = hf_hub_download(repo_id="skytnt/deepdanbooru_onnx", filename="deepdanbooru.onnx") |
|
|
|
tagger_model = rt.InferenceSession(tagger_model_path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']) |
|
tagger_model_meta = tagger_model.get_modelmeta().custom_metadata_map |
|
tagger_tags = eval(tagger_model_meta['tags']) |
|
|
|
def tagger_predict(image, score_threshold): |
|
s = 512 |
|
h, w = image.shape[:-1] |
|
h, w = (s, int(s * w / h)) if h > w else (int(s * h / w), s) |
|
ph, pw = s - h, s - w |
|
image = cv2.resize(image, (w, h), interpolation=cv2.INTER_AREA) |
|
image = cv2.copyMakeBorder(image, ph // 2, ph - ph // 2, pw // 2, pw - pw // 2, cv2.BORDER_REPLICATE) |
|
image = image.astype(np.float32) / 255 |
|
image = img_new[np.newaxis, :] |
|
probs = tagger_model.run(None, {"input_1": image})[0][0] |
|
probs = probs.astype(np.float32) |
|
res = [] |
|
for prob, label in zip(probs.tolist(), tagger_tags): |
|
if prob < score_threshold: |
|
continue |
|
res.append(label) |
|
return res |
|
|
|
img = cv2.imread("test.jpg") |
|
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
|
tags = tagger_predict(img, 0.5) |
|
print(tags) |
|
``` |
|
|
|
### Multi-gpu batch process |
|
|
|
|
|
```python |
|
import cv2 |
|
import torch |
|
import os |
|
import numpy as np |
|
import onnxruntime as rt |
|
from huggingface_hub import hf_hub_download |
|
from torch.utils.data import DataLoader, Dataset |
|
from PIL import Image |
|
from tqdm import tqdm |
|
from threading import Thread |
|
|
|
|
|
class MyDataset(Dataset): |
|
def __init__(self, image_list): |
|
self.image_list = image_list |
|
|
|
def __len__(self): |
|
length = len(self.image_list) |
|
return length |
|
|
|
def __getitem__(self, index): |
|
image = Image.open(self.image_list[index]).convert("RGB") |
|
image = np.asarray(image) |
|
s = 512 |
|
h, w = image.shape[:-1] |
|
h, w = (s, int(s * w / h)) if h > w else (int(s * h / w), s) |
|
ph, pw = s - h, s - w |
|
image = cv2.resize(image, (w, h), interpolation=cv2.INTER_AREA) |
|
image = cv2.copyMakeBorder(image, ph // 2, ph - ph // 2, pw // 2, pw - pw // 2, cv2.BORDER_REPLICATE) |
|
image = image.astype(np.float32) / 255 |
|
image = torch.from_numpy(image) |
|
idx = torch.tensor([index], dtype=torch.int32) |
|
return image, idx |
|
|
|
|
|
def get_images(path): |
|
def file_ext(fname): |
|
return os.path.splitext(fname)[1].lower() |
|
|
|
all_files = { |
|
os.path.relpath(os.path.join(root, fname), path) |
|
for root, _dirs, files in os.walk(path) |
|
for fname in files |
|
} |
|
all_images = sorted( |
|
os.path.join(path, fname) for fname in all_files if file_ext(fname) in [".png", ".jpg", ".jpeg"] |
|
) |
|
print(len(all_images)) |
|
return all_images |
|
|
|
|
|
def process(all_images, batch_size=8, score_threshold=0.35): |
|
predictions = {} |
|
|
|
def work_fn(images, device_id): |
|
dataset = MyDataset(images) |
|
dataloader = DataLoader( |
|
dataset, |
|
batch_size=batch_size, |
|
shuffle=False, |
|
persistent_workers=True, |
|
num_workers=4, |
|
pin_memory=True, |
|
) |
|
for data in tqdm(dataloader): |
|
image, idxs = data |
|
image = image.numpy() |
|
probs = tagger_model[device_id].run(None, {"input_1": image})[0] |
|
probs = probs.astype(np.float32) |
|
bs = probs.shape[0] |
|
for i in range(bs): |
|
tags = [] |
|
for prob, label in zip(probs[i].tolist(), tagger_tags): |
|
if prob > score_threshold: |
|
tags.append((label, prob)) |
|
predictions[images[idxs[i].item()]] = tags |
|
|
|
gpu_num = len(tagger_model) |
|
image_num = (len(all_images) // gpu_num) + 1 |
|
ts = [Thread(target=work_fn, args=(all_images[i * image_num:(i + 1) * image_num], i)) for i in range(gpu_num)] |
|
for t in ts: |
|
t.start() |
|
for t in ts: |
|
t.join() |
|
return predictions |
|
|
|
|
|
gpu_num = 4 |
|
batch_size = 8 |
|
tagger_model_path = hf_hub_download(repo_id="skytnt/deepdanbooru_onnx", filename="deepdanbooru.onnx") |
|
tagger_model = [ |
|
rt.InferenceSession(tagger_model_path, providers=['CUDAExecutionProvider'], provider_options=[{'device_id': i}]) for |
|
i in range(gpu_num)] |
|
tagger_model_meta = tagger_model[0].get_modelmeta().custom_metadata_map |
|
tagger_tags = eval(tagger_model_meta['tags']) |
|
|
|
all_images = get_images("./data") |
|
predictions = process(all_images, batch_size) |
|
``` |