Spaces:
Sleeping
Sleeping
import streamlit as st | |
import open_clip | |
import torch | |
import requests | |
from PIL import Image | |
from io import BytesIO | |
import time | |
import numpy as np | |
from transformers import pipeline | |
import chromadb | |
from sklearn.metrics.pairwise import euclidean_distances | |
from sklearn.preprocessing import normalize | |
from sklearn.metrics.pairwise import cosine_similarity | |
import faiss | |
# Load segmentation model | |
segmenter = pipeline(model="mattmdjaga/segformer_b2_clothes") | |
# Load CLIP model and tokenizer | |
def load_clip_model(): | |
model, preprocess_train, preprocess_val = open_clip.create_model_and_transforms('hf-hub:Marqo/marqo-fashionSigLIP') | |
tokenizer = open_clip.get_tokenizer('hf-hub:Marqo/marqo-fashionSigLIP') | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model.to(device) | |
return model, preprocess_val, tokenizer, device | |
clip_model, preprocess_val, tokenizer, device = load_clip_model() | |
# Load chromaDB | |
client = chromadb.PersistentClient(path="./fashion_multimodal_db") | |
collection = client.get_collection(name="fashion_multimodal") | |
# Helper functions | |
def load_image_from_url(url, max_retries=3): | |
for attempt in range(max_retries): | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
img = Image.open(BytesIO(response.content)).convert('RGB') | |
return img | |
except (requests.RequestException, Image.UnidentifiedImageError) as e: | |
if attempt < max_retries - 1: | |
time.sleep(1) | |
else: | |
return None | |
def initialize_faiss_index(collection): | |
# ๋ชจ๋ ์๋ฒ ๋ฉ์ ๊ฐ์ ธ์ numpy ๋ฐฐ์ด๋ก ๋ณํ | |
all_data = collection.get(include=['embeddings', 'metadatas']) | |
all_embeddings = np.array(all_data['embeddings']).astype('float32') | |
all_metadatas = all_data['metadatas'] | |
# faiss ์ธ๋ฑ์ค ์์ฑ ๋ฐ ์๋ฒ ๋ฉ ์ถ๊ฐ | |
dimension = all_embeddings.shape[1] | |
index = faiss.IndexFlatIP(dimension) # ์ฝ์ฌ์ธ ์ ์ฌ๋๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด IndexFlatIP๋ฅผ ์ฌ์ฉ | |
index.add(all_embeddings) | |
return index, all_metadatas | |
faiss_index, all_metadatas = initialize_faiss_index(collection) | |
# ์ธ๊ทธ๋จผํธ ๋ง์คํฌ ๊ธฐ๋ฐ ์๋ฒ ๋ฉ ์ถ์ถ | |
def get_segmented_embedding(img, final_mask): | |
img_array = np.array(img) | |
final_mask_array = np.array(final_mask) | |
# ๋ง์คํฌ ์ ์ฉ (๋ฐฐ๊ฒฝ์ ํฐ์์ผ๋ก ์ฒ๋ฆฌ) | |
img_array[final_mask_array == 0] = 255 | |
masked_img = Image.fromarray(img_array) | |
# ๋ง์คํฌ๋ ์ด๋ฏธ์ง๋ก๋ถํฐ ์๋ฒ ๋ฉ ์ถ์ถ | |
image_tensor = preprocess_val(masked_img).unsqueeze(0).to(device) | |
with torch.no_grad(): | |
image_features = clip_model.encode_image(image_tensor) | |
image_features /= image_features.norm(dim=-1, keepdim=True) | |
return image_features.cpu().numpy().flatten() | |
def get_image_embedding(image): | |
image_tensor = preprocess_val(image).unsqueeze(0).to(device) | |
with torch.no_grad(): | |
image_features = clip_model.encode_image(image_tensor) | |
image_features /= image_features.norm(dim=-1, keepdim=True) # ์ ๊ทํ | |
return image_features.cpu().numpy().flatten() | |
def segment_clothing(img, clothes=["Hat", "Upper-clothes", "Skirt", "Pants", "Dress", "Belt", "Left-shoe", "Right-shoe", "Scarf"]): | |
segments = segmenter(img) | |
mask_list = [] | |
detected_categories = [] | |
for s in segments: | |
if s['label'] in clothes: | |
mask_list.append(s['mask']) | |
detected_categories.append(s['label']) | |
final_mask = np.zeros_like(np.array(img)[:, :, 0]) | |
for mask in mask_list: | |
current_mask = np.array(mask) | |
final_mask = np.maximum(final_mask, current_mask) | |
final_mask = Image.fromarray(final_mask.astype(np.uint8) * 255) | |
img_with_alpha = img.convert("RGBA") | |
img_with_alpha.putalpha(final_mask) | |
return img_with_alpha.convert("RGB"), final_mask, detected_categories | |
#def find_similar_images(query_embedding, collection, top_k=5): | |
# ChromaDB์์ ๊ฐ์ฅ ์ ์ฌํ ์ด๋ฏธ์ง๋ค์ ์ฟผ๋ฆฌํฉ๋๋ค. | |
# results = collection.query( | |
# query_embeddings=query_embedding.reshape(1, -1), # 2D ๋ฐฐ์ด๋ก ๋ณํ | |
# n_results=top_k, | |
# include=['metadatas', 'embeddings'] # ์๋ฒ ๋ฉ์ ํฌํจํ๋๋ก ์์ | |
# ) | |
# | |
# # ๋ฉํ๋ฐ์ดํฐ์ ์๋ฒ ๋ฉ์ ์ถ์ถํฉ๋๋ค. | |
# top_metadatas = results['metadatas'][0] | |
# top_embeddings = results['embeddings'][0] # ์๋ฒ ๋ฉ ๊ฐ์ ธ์ค๊ธฐ | |
# | |
# # ์ ํด๋ฆฌ๋ ๊ฑฐ๋ฆฌ ๊ณ์ฐ | |
# distances = euclidean_distances(query_embedding.reshape(1, -1), top_embeddings).flatten() | |
# | |
# structured_results = [] | |
# for metadata, distance in zip(top_metadatas, distances): | |
# structured_results.append({ | |
# 'info': metadata, | |
# 'similarity': 1 / (1 + distance) # ๊ฑฐ๋ฆฌ ๊ธฐ๋ฐ ์ ์ฌ๋ (๊ฑฐ๋ฆฌ๊ฐ ์์์๋ก ์ ์ฌ๋๊ฐ ๋์) | |
# }) | |
# return structured_results | |
def find_similar_images(query_embedding, faiss_index, all_metadatas, top_k=5): | |
query_embedding = query_embedding.astype('float32').reshape(1, -1) # ์ฐจ์ ์กฐ์ ๋ฐ ํ๋ณํ | |
_, indices = faiss_index.search(query_embedding, top_k) # ์ ์ฌํ ๋ฒกํฐ์ ์ธ๋ฑ์ค๋ฅผ ๋ฐํ | |
# ์ธ๋ฑ์ค๋ฅผ ์ด์ฉํด ๋ฉํ๋ฐ์ดํฐ์ ์ ์ฌ๋ ๊ฐ์ ธ์ค๊ธฐ | |
structured_results = [] | |
for idx in indices[0]: | |
metadata = all_metadatas[idx] | |
similarity = cosine_similarity(query_embedding, faiss_index.reconstruct(idx).reshape(1, -1))[0][0] | |
structured_results.append({ | |
'info': metadata, | |
'similarity': similarity | |
}) | |
return structured_results | |
#def find_similar_images(query_embedding, collection, top_k=5, batch_size=500): | |
# query_embedding = query_embedding.reshape(1, -1) # ์ฟผ๋ฆฌ ์๋ฒ ๋ฉ ์ฐจ์ ์กฐ์ | |
# | |
# # ๋ชจ๋ ์๋ฒ ๋ฉ๊ณผ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ํ ๋ฒ์ ๊ฐ์ ธ์ด | |
# all_data = collection.get(include=['embeddings', 'metadatas']) | |
# all_embeddings = np.array(all_data['embeddings']) | |
# all_metadatas = all_data['metadatas'] | |
# | |
# all_results = [] | |
# | |
# # ์ ์ฒด ๋ฐ์ดํฐ๋ฅผ batch_size์ฉ ๋๋์ด ์ฒ๋ฆฌ | |
# for start in range(0, len(all_embeddings), batch_size): | |
# end = start + batch_size | |
# batch_embeddings = all_embeddings[start:end] | |
# batch_metadatas = all_metadatas[start:end] | |
# | |
# # ์ฝ์ฌ์ธ ์ ์ฌ๋ ๊ณ์ฐ | |
# similarities = cosine_similarity(query_embedding, batch_embeddings).flatten() | |
# | |
# # ํ์ฌ ๋ฐฐ์น์์ ์ ์ฌ๋์ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ์์ผ๋ก ๋ฌถ์ด ์ถ๊ฐ | |
# batch_results = [{'info': metadata, 'similarity': similarity} for similarity, metadata in zip(similarities, batch_metadatas)] | |
# all_results.extend(batch_results) | |
# | |
# # ์ ์ฒด ๊ฒฐ๊ณผ ์ค์์ ์ ์ฌ๋๊ฐ ๋์ ์์๋๋ก top_k ๊ฐ๋ง ์ ํ | |
# sorted_results = sorted(all_results, key=lambda x: x['similarity'], reverse=True)[:top_k] | |
# | |
# return sorted_results | |
# ์ธ์ ์ํ ์ด๊ธฐํ | |
if 'step' not in st.session_state: | |
st.session_state.step = 'input' | |
if 'query_image_url' not in st.session_state: | |
st.session_state.query_image_url = '' | |
if 'detections' not in st.session_state: | |
st.session_state.detections = [] | |
if 'segmented_image' not in st.session_state: | |
st.session_state.segmented_image = None | |
if 'selected_category' not in st.session_state: | |
st.session_state.selected_category = None | |
# Streamlit app | |
st.title("Advanced Fashion Search App") | |
if st.session_state.step == 'input': | |
st.session_state.query_image_url = st.text_input("Enter image URL:", st.session_state.query_image_url) | |
if st.button("Detect Clothing"): | |
if st.session_state.query_image_url: | |
query_image = load_image_from_url(st.session_state.query_image_url) | |
if query_image is not None: | |
st.session_state.query_image = query_image | |
segmented_image, final_mask, detected_categories = segment_clothing(query_image) | |
st.session_state.segmented_image = segmented_image | |
st.session_state.detections = detected_categories | |
st.image(segmented_image, caption="Segmented Image", use_column_width=True) | |
if st.session_state.detections: | |
st.session_state.step = 'select_category' | |
else: | |
st.warning("No clothing items detected in the image.") | |
else: | |
st.error("Failed to load the image. Please try another URL.") | |
else: | |
st.warning("Please enter an image URL.") | |
elif st.session_state.step == 'select_category': | |
st.image(st.session_state.segmented_image, caption="Segmented Image with Detected Categories", use_column_width=True) | |
st.subheader("Detected Clothing Categories:") | |
if st.session_state.detections: | |
selected_category = st.selectbox("Select a category to search:", st.session_state.detections) | |
if st.button("Search Similar Items"): | |
st.session_state.selected_category = selected_category | |
st.session_state.step = 'show_results' | |
else: | |
st.warning("No categories detected.") | |
elif st.session_state.step == 'show_results': | |
original_image = st.session_state.query_image.convert("RGB") | |
st.image(original_image, caption="Original Image", use_column_width=True) | |
# ์ธ๊ทธ๋จผํธ๋ ์ด๋ฏธ์ง์์ ์๋ฒ ๋ฉ ์ถ์ถ | |
query_embedding = get_segmented_embedding(st.session_state.query_image, st.session_state.segmented_image) | |
#query_embedding = get_image_embedding(st.session_state.segmented_image) | |
similar_images = find_similar_images(query_embedding, collection) | |
st.subheader("Similar Items:") | |
for img in similar_images: | |
col1, col2 = st.columns(2) | |
with col1: | |
st.image(img['info']['image_url'], use_column_width=True) | |
with col2: | |
st.write(f"Name: {img['info']['name']}") | |
st.write(f"Brand: {img['info']['brand']}") | |
category = img['info'].get('category') | |
if category: | |
st.write(f"Category: {category}") | |
st.write(f"Price: {img['info']['price']}") | |
st.write(f"Discount: {img['info']['discount']}%") | |
st.write(f"Similarity: {img['similarity']:.2f}") | |
if st.button("Start New Search"): | |
st.session_state.step = 'input' | |
st.session_state.query_image_url = '' | |
st.session_state.detections = [] | |
st.session_state.segmented_image = None | |