from __future__ import unicode_literals import re import unicodedata import torch import streamlit as st import pandas as pd import pyarrow as pa import pyarrow.parquet as pq import numpy as np import scipy.spatial from transformers import BertJapaneseTokenizer, BertModel import pyminizip def unicode_normalize(cls, s): pt = re.compile("([{}]+)".format(cls)) def norm(c): return unicodedata.normalize("NFKC", c) if pt.match(c) else c s = "".join(norm(x) for x in re.split(pt, s)) s = re.sub("-", "-", s) return s def remove_extra_spaces(s): s = re.sub("[  ]+", " ", s) blocks = "".join( ( "\u4E00-\u9FFF", # CJK UNIFIED IDEOGRAPHS "\u3040-\u309F", # HIRAGANA "\u30A0-\u30FF", # KATAKANA "\u3000-\u303F", # CJK SYMBOLS AND PUNCTUATION "\uFF00-\uFFEF", # HALFWIDTH AND FULLWIDTH FORMS ) ) basic_latin = "\u0000-\u007F" def remove_space_between(cls1, cls2, s): p = re.compile("([{}]) ([{}])".format(cls1, cls2)) while p.search(s): s = p.sub(r"\1\2", s) return s s = remove_space_between(blocks, blocks, s) s = remove_space_between(blocks, basic_latin, s) s = remove_space_between(basic_latin, blocks, s) return s def normalize_neologd(s): s = s.strip() s = unicode_normalize("0-9A-Za-z。-゚", s) def maketrans(f, t): return {ord(x): ord(y) for x, y in zip(f, t)} s = re.sub("[˗֊‐‑‒–⁃⁻₋−]+", "-", s) # normalize hyphens s = re.sub("[﹣-ー—―─━ー]+", "ー", s) # normalize choonpus s = re.sub("[~∼∾〜〰~]+", "〜", s) # normalize tildes (modified by Isao Sonobe) s = s.translate( maketrans( "!\"#$%&'()*+,-./:;<=>?@[¥]^_`{|}~。、・「」", "!”#$%&’()*+,-./:;<=>?@[¥]^_`{|}〜。、・「」", ) ) s = remove_extra_spaces(s) s = unicode_normalize("!”#$%&’()*+,-./:;<>?@[¥]^_`{|}〜", s) # keep =,・,「,」 s = re.sub("[’]", "'", s) s = re.sub("[”]", '"', s) # s = s.upper() return s def normalize_text(text): return normalize_neologd(text) def normalize_title(title): title = title.strip() match = re.match(r"^「([^」]+)」$", title) if match: title = match.group(1) match = re.match(r"^POP素材「([^」]+)」$", title) if match: title = match.group(1) match = re.match( r"^(.*?)(の?(?:イラスト|イラストの|イラストト|イ子のラスト|イラス|イラスト文字|「イラスト文字」|イラストPOP文字|ペンキ文字|タイトル文字|イラスト・メッセージ|イラスト文字・バナー|キャラクター(たち)?|マーク|アイコン|シルエット|シルエット素材|フレーム(枠)|フレーム|フレーム素材|テンプレート|パターン|パターン素材|ライン素材|コーナー素材|リボン型バナー|評価スタンプ|背景素材))+(\s*([0-90-9]*|その[0-90-9]+)\s*(((|\()[^))]+()|\))|「[^」]+」|・.+)*(です。)?)", title, ) if match: title = match.group(1) + ("" if match.group(3) is None else match.group(3)) if title == "": raise ValueError(title) title = normalize_text(title) return title class SentenceBertJapanese: def __init__(self, model_name_or_path, device=None): self.tokenizer = BertJapaneseTokenizer.from_pretrained(model_name_or_path) self.model = BertModel.from_pretrained(model_name_or_path) self.model.eval() if device is None: device = "cuda" if torch.cuda.is_available() else "cpu" self.device = torch.device(device) self.model.to(device) def _mean_pooling(self, model_output, attention_mask): token_embeddings = model_output[ 0 ] # First element of model_output contains all token embeddings input_mask_expanded = ( attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() ) return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp( input_mask_expanded.sum(1), min=1e-9 ) @torch.no_grad() def encode(self, sentences, batch_size=8): all_embeddings = [] iterator = range(0, len(sentences), batch_size) for batch_idx in iterator: batch = sentences[batch_idx : batch_idx + batch_size] encoded_input = self.tokenizer.batch_encode_plus( batch, padding="longest", truncation=True, return_tensors="pt" ).to(self.device) model_output = self.model(**encoded_input) sentence_embeddings = self._mean_pooling( model_output, encoded_input["attention_mask"] ).to("cpu") all_embeddings.extend(sentence_embeddings) # return torch.stack(all_embeddings).numpy() return torch.stack(all_embeddings) st.title("いらすと検索") description_text = st.empty() description_text.text("...モデル読み込み中...") model = SentenceBertJapanese("sonoisa/sentence-bert-base-ja-mean-tokens") pyminizip.uncompress( "irasuto_items_20210224.pq.zip", st.secrets["ZIP_PASSWORD"], None, 1 ) df = pq.read_table("irasuto_items_20210224.parquet").to_pandas() sentence_vectors = np.stack(df["sentence_vector"]) description_text.text("説明文の意味が近い「いらすとや」画像を検索します。") prev_query = "" query_input = st.text_input(label="説明文", value="") search_buttion = st.button("検索") closest_n = 10 if search_buttion or prev_query != str(query_input): query = str(query_input) prev_query = query query_embedding = model.encode([query]).numpy() distances = scipy.spatial.distance.cdist( query_embedding, sentence_vectors, metric="euclidean" )[0] results = zip(range(len(distances)), distances) results = sorted(results, key=lambda x: x[1]) for i, (idx, distance) in enumerate(results[0:closest_n]): md_content = "" page_url = df.iloc[idx]["page"] for img_url in df.iloc[idx]["images"]: md_content += f'' md_content += f'\n[%.4f {df.iloc[idx]["title"]}]({page_url})' % (distance / 2) md_content += f'\n{df.iloc[idx]["normalized_description"]}' st.markdown(md_content, unsafe_allow_html=True)