import streamlit as st import open_clip import torch import requests from PIL import Image from io import BytesIO import time import numpy as np from transformers import pipeline import chromadb from sklearn.metrics.pairwise import euclidean_distances from sklearn.preprocessing import normalize from sklearn.metrics.pairwise import cosine_similarity import faiss # Load segmentation model segmenter = pipeline(model="mattmdjaga/segformer_b2_clothes") # Load CLIP model and tokenizer @st.cache_resource def load_clip_model(): model, preprocess_train, preprocess_val = open_clip.create_model_and_transforms('hf-hub:Marqo/marqo-fashionSigLIP') tokenizer = open_clip.get_tokenizer('hf-hub:Marqo/marqo-fashionSigLIP') device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) return model, preprocess_val, tokenizer, device clip_model, preprocess_val, tokenizer, device = load_clip_model() # Load chromaDB client = chromadb.PersistentClient(path="./fashion_multimodal_db") collection = client.get_collection(name="fashion_multimodal") # Helper functions def load_image_from_url(url, max_retries=3): for attempt in range(max_retries): try: response = requests.get(url, timeout=10) response.raise_for_status() img = Image.open(BytesIO(response.content)).convert('RGB') return img except (requests.RequestException, Image.UnidentifiedImageError) as e: if attempt < max_retries - 1: time.sleep(1) else: return None # 세그먼트 마스크 기반 임베딩 추출 def get_segmented_embedding(img, final_mask): img_array = np.array(img) final_mask_array = np.array(final_mask) # 마스크 적용 (배경을 흰색으로 처리) img_array[final_mask_array == 0] = 255 masked_img = Image.fromarray(img_array) # 마스크된 이미지로부터 임베딩 추출 image_tensor = preprocess_val(masked_img).unsqueeze(0).to(device) with torch.no_grad(): image_features = clip_model.encode_image(image_tensor) image_features /= image_features.norm(dim=-1, keepdim=True) return image_features.cpu().numpy().flatten() def get_image_embedding(image): image_tensor = preprocess_val(image).unsqueeze(0).to(device) with torch.no_grad(): image_features = clip_model.encode_image(image_tensor) image_features /= image_features.norm(dim=-1, keepdim=True) # 정규화 return image_features.cpu().numpy().flatten() def segment_clothing(img, clothes=["Hat", "Upper-clothes", "Skirt", "Pants", "Dress", "Belt", "Left-shoe", "Right-shoe", "Scarf"]): segments = segmenter(img) mask_list = [] detected_categories = [] for s in segments: if s['label'] in clothes: mask_list.append(s['mask']) detected_categories.append(s['label']) final_mask = np.zeros_like(np.array(img)[:, :, 0]) for mask in mask_list: current_mask = np.array(mask) final_mask = np.maximum(final_mask, current_mask) final_mask = Image.fromarray(final_mask.astype(np.uint8) * 255) img_with_alpha = img.convert("RGBA") img_with_alpha.putalpha(final_mask) return img_with_alpha.convert("RGB"), final_mask, detected_categories #def find_similar_images(query_embedding, collection, top_k=5): # ChromaDB에서 가장 유사한 이미지들을 쿼리합니다. # results = collection.query( # query_embeddings=query_embedding.reshape(1, -1), # 2D 배열로 변환 # n_results=top_k, # include=['metadatas', 'embeddings'] # 임베딩을 포함하도록 수정 # ) # # # 메타데이터와 임베딩을 추출합니다. # top_metadatas = results['metadatas'][0] # top_embeddings = results['embeddings'][0] # 임베딩 가져오기 # # # 유클리드 거리 계산 # distances = euclidean_distances(query_embedding.reshape(1, -1), top_embeddings).flatten() # # structured_results = [] # for metadata, distance in zip(top_metadatas, distances): # structured_results.append({ # 'info': metadata, # 'similarity': 1 / (1 + distance) # 거리 기반 유사도 (거리가 작을수록 유사도가 높음) # }) # return structured_results #def find_similar_images(query_embedding, collection, top_k=5): # # ChromaDB 쿼리 # results = collection.query( # query_embeddings=query_embedding.reshape(1, -1), # 2D 배열로 변환 # n_results=top_k, # include=['metadatas', 'embeddings'] # 메타데이터와 임베딩 포함 # ) # # # 메타데이터와 임베딩 추출 # top_metadatas = results['metadatas'][0] # top_embeddings = np.array(results['embeddings'][0]) # numpy 배열로 변환 # # 코사인 유사도 계산 # query_embedding_normalized = normalize(query_embedding.reshape(1, -1), axis=1) # top_embeddings_normalized = normalize(top_embeddings, axis=1) # similarities = cosine_similarity(query_embedding_normalized, top_embeddings_normalized).flatten() # structured_results = [] # for metadata, similarity in zip(top_metadatas, similarities): # structured_results.append({ # 'info': metadata, # 'similarity': similarity # }) # # return structured_results def find_similar_images(query_embedding, collection, top_k=5): # ChromaDB에서 임베딩과 메타데이터를 가져옵니다. all_data = collection.get(include=['embeddings', 'metadatas']) all_embeddings = np.array(all_data['embeddings']).astype('float32') # faiss는 float32 필요 all_metadatas = all_data['metadatas'] # faiss 인덱스를 L2 거리 기반으로 생성 (코사인 유사도는 정규화 필요) faiss.normalize_L2(all_embeddings) # L2 정규화 index = faiss.IndexFlatIP(all_embeddings.shape[1]) # IP는 Inner Product(코사인 유사도) index.add(all_embeddings) # 쿼리 임베딩 정규화 후 faiss 검색 query_embedding = query_embedding.reshape(1, -1).astype('float32') faiss.normalize_L2(query_embedding) distance, indices = index.search(query_embedding, top_k) # 검색된 상위 결과를 반환 structured_results = [] for metadata, idx in zip(all_metadatas, indices[0]): structured_results.append({ 'info': metadata, 'similarity': 1-distance }) return structured_results #def find_similar_images(query_embedding, collection, top_k=5, batch_size=500): # query_embedding = query_embedding.reshape(1, -1) # 쿼리 임베딩 차원 조정 # # # 모든 임베딩과 메타데이터를 한 번에 가져옴 # all_data = collection.get(include=['embeddings', 'metadatas']) # all_embeddings = np.array(all_data['embeddings']) # all_metadatas = all_data['metadatas'] # # all_results = [] # # # 전체 데이터를 batch_size씩 나누어 처리 # for start in range(0, len(all_embeddings), batch_size): # end = start + batch_size # batch_embeddings = all_embeddings[start:end] # batch_metadatas = all_metadatas[start:end] # # # 코사인 유사도 계산 # similarities = cosine_similarity(query_embedding, batch_embeddings).flatten() # # # 현재 배치에서 유사도와 메타데이터를 쌍으로 묶어 추가 # batch_results = [{'info': metadata, 'similarity': similarity} for similarity, metadata in zip(similarities, batch_metadatas)] # all_results.extend(batch_results) # # # 전체 결과 중에서 유사도가 높은 순서대로 top_k 개만 선택 # sorted_results = sorted(all_results, key=lambda x: x['similarity'], reverse=True)[:top_k] # # return sorted_results # 세션 상태 초기화 if 'step' not in st.session_state: st.session_state.step = 'input' if 'query_image_url' not in st.session_state: st.session_state.query_image_url = '' if 'detections' not in st.session_state: st.session_state.detections = [] if 'segmented_image' not in st.session_state: st.session_state.segmented_image = None if 'selected_category' not in st.session_state: st.session_state.selected_category = None # Streamlit app st.title("Advanced Fashion Search App") if st.session_state.step == 'input': st.session_state.query_image_url = st.text_input("Enter image URL:", st.session_state.query_image_url) if st.button("Detect Clothing"): if st.session_state.query_image_url: query_image = load_image_from_url(st.session_state.query_image_url) if query_image is not None: st.session_state.query_image = query_image segmented_image, final_mask, detected_categories = segment_clothing(query_image) st.session_state.segmented_image = segmented_image st.session_state.detections = detected_categories st.image(segmented_image, caption="Segmented Image", use_column_width=True) if st.session_state.detections: st.session_state.step = 'select_category' else: st.warning("No clothing items detected in the image.") else: st.error("Failed to load the image. Please try another URL.") else: st.warning("Please enter an image URL.") elif st.session_state.step == 'select_category': st.image(st.session_state.segmented_image, caption="Segmented Image with Detected Categories", use_column_width=True) st.subheader("Detected Clothing Categories:") if st.session_state.detections: selected_category = st.selectbox("Select a category to search:", st.session_state.detections) if st.button("Search Similar Items"): st.session_state.selected_category = selected_category st.session_state.step = 'show_results' else: st.warning("No categories detected.") elif st.session_state.step == 'show_results': original_image = st.session_state.query_image.convert("RGB") st.image(original_image, caption="Original Image", use_column_width=True) # 세그먼트된 이미지에서 임베딩 추출 query_embedding = get_segmented_embedding(st.session_state.query_image, st.session_state.segmented_image) #query_embedding = get_image_embedding(st.session_state.segmented_image) similar_images = find_similar_images(query_embedding, collection) st.subheader("Similar Items:") for img in similar_images: col1, col2 = st.columns(2) with col1: st.image(img['info']['image_url'], use_column_width=True) with col2: st.write(f"Name: {img['info']['name']}") st.write(f"Brand: {img['info']['brand']}") category = img['info'].get('category') if category: st.write(f"Category: {category}") st.write(f"Price: {img['info']['price']}") st.write(f"Discount: {img['info']['discount']}%") st.write(f"Similarity: {img['similarity']:.2f}") if st.button("Start New Search"): st.session_state.step = 'input' st.session_state.query_image_url = '' st.session_state.detections = [] st.session_state.segmented_image = None