JoJosmin's picture
Update app.py
65364e8 verified
raw
history blame
10.5 kB
import streamlit as st
import open_clip
import torch
import requests
from PIL import Image
from io import BytesIO
import time
import numpy as np
from transformers import pipeline
import chromadb
from sklearn.metrics.pairwise import euclidean_distances
from sklearn.preprocessing import normalize
from sklearn.metrics.pairwise import cosine_similarity
import faiss
# Load segmentation model
segmenter = pipeline(model="mattmdjaga/segformer_b2_clothes")
# Load CLIP model and tokenizer
@st.cache_resource
def load_clip_model():
model, preprocess_train, preprocess_val = open_clip.create_model_and_transforms('hf-hub:Marqo/marqo-fashionSigLIP')
tokenizer = open_clip.get_tokenizer('hf-hub:Marqo/marqo-fashionSigLIP')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
return model, preprocess_val, tokenizer, device
clip_model, preprocess_val, tokenizer, device = load_clip_model()
# Load chromaDB
client = chromadb.PersistentClient(path="./fashion_multimodal_db")
collection = client.get_collection(name="fashion_multimodal")
# Helper functions
def load_image_from_url(url, max_retries=3):
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
img = Image.open(BytesIO(response.content)).convert('RGB')
return img
except (requests.RequestException, Image.UnidentifiedImageError) as e:
if attempt < max_retries - 1:
time.sleep(1)
else:
return None
def initialize_faiss_index(collection):
# ๋ชจ๋“  ์ž„๋ฒ ๋”ฉ์„ ๊ฐ€์ ธ์™€ numpy ๋ฐฐ์—ด๋กœ ๋ณ€ํ™˜
all_data = collection.get(include=['embeddings', 'metadatas'])
all_embeddings = np.array(all_data['embeddings']).astype('float32')
all_metadatas = all_data['metadatas']
# faiss ์ธ๋ฑ์Šค ์ƒ์„ฑ ๋ฐ ์ž„๋ฒ ๋”ฉ ์ถ”๊ฐ€
dimension = all_embeddings.shape[1]
index = faiss.IndexFlatIP(dimension) # ์ฝ”์‚ฌ์ธ ์œ ์‚ฌ๋„๋ฅผ ์‚ฌ์šฉํ•˜๋ ค๋ฉด IndexFlatIP๋ฅผ ์‚ฌ์šฉ
index.add(all_embeddings)
return index, all_metadatas
faiss_index, all_metadatas = initialize_faiss_index(collection)
# ์„ธ๊ทธ๋จผํŠธ ๋งˆ์Šคํฌ ๊ธฐ๋ฐ˜ ์ž„๋ฒ ๋”ฉ ์ถ”์ถœ
def get_segmented_embedding(img, final_mask):
img_array = np.array(img)
final_mask_array = np.array(final_mask)
# ๋งˆ์Šคํฌ ์ ์šฉ (๋ฐฐ๊ฒฝ์„ ํฐ์ƒ‰์œผ๋กœ ์ฒ˜๋ฆฌ)
img_array[final_mask_array == 0] = 255
masked_img = Image.fromarray(img_array)
# ๋งˆ์Šคํฌ๋œ ์ด๋ฏธ์ง€๋กœ๋ถ€ํ„ฐ ์ž„๋ฒ ๋”ฉ ์ถ”์ถœ
image_tensor = preprocess_val(masked_img).unsqueeze(0).to(device)
with torch.no_grad():
image_features = clip_model.encode_image(image_tensor)
image_features /= image_features.norm(dim=-1, keepdim=True)
return image_features.cpu().numpy().flatten()
def get_image_embedding(image):
image_tensor = preprocess_val(image).unsqueeze(0).to(device)
with torch.no_grad():
image_features = clip_model.encode_image(image_tensor)
image_features /= image_features.norm(dim=-1, keepdim=True) # ์ •๊ทœํ™”
return image_features.cpu().numpy().flatten()
def segment_clothing(img, clothes=["Hat", "Upper-clothes", "Skirt", "Pants", "Dress", "Belt", "Left-shoe", "Right-shoe", "Scarf"]):
segments = segmenter(img)
mask_list = []
detected_categories = []
for s in segments:
if s['label'] in clothes:
mask_list.append(s['mask'])
detected_categories.append(s['label'])
final_mask = np.zeros_like(np.array(img)[:, :, 0])
for mask in mask_list:
current_mask = np.array(mask)
final_mask = np.maximum(final_mask, current_mask)
final_mask = Image.fromarray(final_mask.astype(np.uint8) * 255)
img_with_alpha = img.convert("RGBA")
img_with_alpha.putalpha(final_mask)
return img_with_alpha.convert("RGB"), final_mask, detected_categories
#def find_similar_images(query_embedding, collection, top_k=5):
# ChromaDB์—์„œ ๊ฐ€์žฅ ์œ ์‚ฌํ•œ ์ด๋ฏธ์ง€๋“ค์„ ์ฟผ๋ฆฌํ•ฉ๋‹ˆ๋‹ค.
# results = collection.query(
# query_embeddings=query_embedding.reshape(1, -1), # 2D ๋ฐฐ์—ด๋กœ ๋ณ€ํ™˜
# n_results=top_k,
# include=['metadatas', 'embeddings'] # ์ž„๋ฒ ๋”ฉ์„ ํฌํ•จํ•˜๋„๋ก ์ˆ˜์ •
# )
#
# # ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ์™€ ์ž„๋ฒ ๋”ฉ์„ ์ถ”์ถœํ•ฉ๋‹ˆ๋‹ค.
# top_metadatas = results['metadatas'][0]
# top_embeddings = results['embeddings'][0] # ์ž„๋ฒ ๋”ฉ ๊ฐ€์ ธ์˜ค๊ธฐ
#
# # ์œ ํด๋ฆฌ๋“œ ๊ฑฐ๋ฆฌ ๊ณ„์‚ฐ
# distances = euclidean_distances(query_embedding.reshape(1, -1), top_embeddings).flatten()
#
# structured_results = []
# for metadata, distance in zip(top_metadatas, distances):
# structured_results.append({
# 'info': metadata,
# 'similarity': 1 / (1 + distance) # ๊ฑฐ๋ฆฌ ๊ธฐ๋ฐ˜ ์œ ์‚ฌ๋„ (๊ฑฐ๋ฆฌ๊ฐ€ ์ž‘์„์ˆ˜๋ก ์œ ์‚ฌ๋„๊ฐ€ ๋†’์Œ)
# })
# return structured_results
def find_similar_images(query_embedding, faiss_index, all_metadatas, top_k=5):
query_embedding = query_embedding.astype('float32').reshape(1, -1) # ์ฐจ์› ์กฐ์ • ๋ฐ ํ˜•๋ณ€ํ™˜
_, indices = faiss_index.search(query_embedding, top_k) # ์œ ์‚ฌํ•œ ๋ฒกํ„ฐ์˜ ์ธ๋ฑ์Šค๋ฅผ ๋ฐ˜ํ™˜
# ์ธ๋ฑ์Šค๋ฅผ ์ด์šฉํ•ด ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ์™€ ์œ ์‚ฌ๋„ ๊ฐ€์ ธ์˜ค๊ธฐ
structured_results = []
for idx in indices[0]:
metadata = all_metadatas[idx]
similarity = cosine_similarity(query_embedding, faiss_index.reconstruct(idx).reshape(1, -1))[0][0]
structured_results.append({
'info': metadata,
'similarity': similarity
})
return structured_results
#def find_similar_images(query_embedding, collection, top_k=5, batch_size=500):
# query_embedding = query_embedding.reshape(1, -1) # ์ฟผ๋ฆฌ ์ž„๋ฒ ๋”ฉ ์ฐจ์› ์กฐ์ •
#
# # ๋ชจ๋“  ์ž„๋ฒ ๋”ฉ๊ณผ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ํ•œ ๋ฒˆ์— ๊ฐ€์ ธ์˜ด
# all_data = collection.get(include=['embeddings', 'metadatas'])
# all_embeddings = np.array(all_data['embeddings'])
# all_metadatas = all_data['metadatas']
#
# all_results = []
#
# # ์ „์ฒด ๋ฐ์ดํ„ฐ๋ฅผ batch_size์”ฉ ๋‚˜๋ˆ„์–ด ์ฒ˜๋ฆฌ
# for start in range(0, len(all_embeddings), batch_size):
# end = start + batch_size
# batch_embeddings = all_embeddings[start:end]
# batch_metadatas = all_metadatas[start:end]
#
# # ์ฝ”์‚ฌ์ธ ์œ ์‚ฌ๋„ ๊ณ„์‚ฐ
# similarities = cosine_similarity(query_embedding, batch_embeddings).flatten()
#
# # ํ˜„์žฌ ๋ฐฐ์น˜์—์„œ ์œ ์‚ฌ๋„์™€ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ์Œ์œผ๋กœ ๋ฌถ์–ด ์ถ”๊ฐ€
# batch_results = [{'info': metadata, 'similarity': similarity} for similarity, metadata in zip(similarities, batch_metadatas)]
# all_results.extend(batch_results)
#
# # ์ „์ฒด ๊ฒฐ๊ณผ ์ค‘์—์„œ ์œ ์‚ฌ๋„๊ฐ€ ๋†’์€ ์ˆœ์„œ๋Œ€๋กœ top_k ๊ฐœ๋งŒ ์„ ํƒ
# sorted_results = sorted(all_results, key=lambda x: x['similarity'], reverse=True)[:top_k]
#
# return sorted_results
# ์„ธ์…˜ ์ƒํƒœ ์ดˆ๊ธฐํ™”
if 'step' not in st.session_state:
st.session_state.step = 'input'
if 'query_image_url' not in st.session_state:
st.session_state.query_image_url = ''
if 'detections' not in st.session_state:
st.session_state.detections = []
if 'segmented_image' not in st.session_state:
st.session_state.segmented_image = None
if 'selected_category' not in st.session_state:
st.session_state.selected_category = None
# Streamlit app
st.title("Advanced Fashion Search App")
if st.session_state.step == 'input':
st.session_state.query_image_url = st.text_input("Enter image URL:", st.session_state.query_image_url)
if st.button("Detect Clothing"):
if st.session_state.query_image_url:
query_image = load_image_from_url(st.session_state.query_image_url)
if query_image is not None:
st.session_state.query_image = query_image
segmented_image, final_mask, detected_categories = segment_clothing(query_image)
st.session_state.segmented_image = segmented_image
st.session_state.detections = detected_categories
st.image(segmented_image, caption="Segmented Image", use_column_width=True)
if st.session_state.detections:
st.session_state.step = 'select_category'
else:
st.warning("No clothing items detected in the image.")
else:
st.error("Failed to load the image. Please try another URL.")
else:
st.warning("Please enter an image URL.")
elif st.session_state.step == 'select_category':
st.image(st.session_state.segmented_image, caption="Segmented Image with Detected Categories", use_column_width=True)
st.subheader("Detected Clothing Categories:")
if st.session_state.detections:
selected_category = st.selectbox("Select a category to search:", st.session_state.detections)
if st.button("Search Similar Items"):
st.session_state.selected_category = selected_category
st.session_state.step = 'show_results'
else:
st.warning("No categories detected.")
elif st.session_state.step == 'show_results':
original_image = st.session_state.query_image.convert("RGB")
st.image(original_image, caption="Original Image", use_column_width=True)
# ์„ธ๊ทธ๋จผํŠธ๋œ ์ด๋ฏธ์ง€์—์„œ ์ž„๋ฒ ๋”ฉ ์ถ”์ถœ
query_embedding = get_segmented_embedding(st.session_state.query_image, st.session_state.segmented_image)
#query_embedding = get_image_embedding(st.session_state.segmented_image)
similar_images = find_similar_images(query_embedding, collection)
st.subheader("Similar Items:")
for img in similar_images:
col1, col2 = st.columns(2)
with col1:
st.image(img['info']['image_url'], use_column_width=True)
with col2:
st.write(f"Name: {img['info']['name']}")
st.write(f"Brand: {img['info']['brand']}")
category = img['info'].get('category')
if category:
st.write(f"Category: {category}")
st.write(f"Price: {img['info']['price']}")
st.write(f"Discount: {img['info']['discount']}%")
st.write(f"Similarity: {img['similarity']:.2f}")
if st.button("Start New Search"):
st.session_state.step = 'input'
st.session_state.query_image_url = ''
st.session_state.detections = []
st.session_state.segmented_image = None