import moviepy.editor as mp from flask import Flask, request, jsonify from flask_cors import CORS import requests from io import BytesIO import speech_recognition as sr import io import fitz # PyMuPDF for working with PDFs import numpy as np import cv2 from flask_caching import Cache from utils.audioEmbedding.index import extract_audio_embeddings from utils.videoEmbedding.index import get_video_embedding from utils.imageToText.index import extract_text from utils.sentanceEmbedding.index import get_text_vector , get_text_discription_vector from utils.imageEmbedding.index import get_image_embedding from utils.similarityScore import get_all_similarities from utils.objectDetection.index import detect_objects app = Flask(__name__) cache = Cache(app, config={'CACHE_TYPE': 'simple'}) # You can choose a caching type based on your requirements CORS(app) import moviepy.editor as mp import tempfile def get_face_locations(binary_data): # Convert binary image data to numpy array print(1) nparr = np.frombuffer(binary_data, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) # Load the pre-trained face detection model face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') # Convert the image to grayscale gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Detect faces in the image faces = face_cascade.detectMultiScale(gray_image, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) # Extract face locations print(2) face_locations = [] for (x, y, w, h) in faces: face_locations.append({"top": y, "right": x + w, "bottom": y + h, "left": x}) print(3) return face_locations def seperate_image_text_from_pdf(pdf_url): # List to store page information pages_info = [] # Fetch the PDF from the URL response = requests.get(pdf_url) if response.status_code == 200: # Create a temporary file to save the PDF data with tempfile.NamedTemporaryFile(delete=False) as tmp_file: tmp_file.write(response.content) tmp_file_path = tmp_file.name # Open the PDF pdf = fitz.open(tmp_file_path) # Iterate through each page for page_num in range(len(pdf)): page = pdf.load_page(page_num) # Extract text text = page.get_text() # Count images image_list = page.get_images(full=True) # Convert images to BytesIO and store in a list images_bytes = [] for img_index, img_info in enumerate(image_list): xref = img_info[0] base_image = pdf.extract_image(xref) image_bytes = base_image["image"] images_bytes.append(image_bytes) # Store page information in a dictionary page_info = { "pgno": page_num + 1, "images": images_bytes, "text": text } # Append page information to the list pages_info.append(page_info) # Close the PDF pdf.close() # Clean up the temporary file import os os.unlink(tmp_file_path) else: print("Failed to fetch the PDF from the URL.") return pages_info def pdf_image_text_embedding_and_text_embedding(pages_info): # List to store page embeddings page_embeddings = [] # Iterate through each page for page in pages_info: # Extract text from the page text = page["text"] # Extract images from the page images = page["images"] # List to store image embeddings image_embeddings = [] # Iterate through each image for image in images: # Get the image embedding image_embedding = get_image_embedding(image) extracted_text = extract_text(image) # Append the image embedding to the list image_embeddings.append({"image_embedding": image_embedding.tolist() ,"extracted_text":extracted_text}) # Get the text embedding # Store the page embeddings in a dictionary page_embedding = { "images": image_embeddings, "text": text, } # Append the page embedding to the list page_embeddings.append(page_embedding) return page_embeddings def separate_audio_from_video(video_url): try: # Load the video file video = mp.VideoFileClip(video_url) # Extract audio audio = video.audio # Create a temporary file to write the audio data with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: temp_audio_filename = temp_audio_file.name # Write the audio data to the temporary file audio.write_audiofile(temp_audio_filename) # Read the audio data from the temporary file as bytes with open(temp_audio_filename, "rb") as f: audio_bytes = f.read() return audio_bytes except Exception as e: print("An error occurred:", e) @cache.cached(timeout=300) @app.route('/get_text_embedding', methods=['POST']) def get_text_embedding_route(): try: text = request.json.get("text") text_embedding = get_text_vector(text) return jsonify({"text_embedding": text_embedding}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 @cache.cached(timeout=300) @app.route('/extract_audio_text_and_embedding', methods=['POST']) def get_audio_embedding_route(): audio_url = request.json.get('audio_url') print(audio_url) response = requests.get(audio_url) audio_data = response.content audio_embedding = extract_audio_embeddings(audio_data) audio_embedding_list = audio_embedding audio_file = BytesIO(audio_data) r = sr.Recognizer() with sr.AudioFile(audio_file) as source: audio_data = r.record(source) extracted_text = "" try: text = r.recognize_google(audio_data) extracted_text = text except Exception as e: print(e) return jsonify({"extracted_text": extracted_text, "audio_embedding": audio_embedding_list}), 200 # Route to get image embeddings @cache.cached(timeout=300) @app.route('/extract_image_text_and_embedding', methods=['POST']) def get_image_embedding_route(): try: image_url = request.json.get("imageUrl") print(image_url) response = requests.get(image_url) if response.status_code != 200: return jsonify({"error": "Failed to download image"}), 500 binary_data = response.content extracted_text = extract_text(binary_data) image_embedding = get_image_embedding(binary_data) image_embedding_list = image_embedding.tolist() return jsonify({"image_embedding": image_embedding_list,"extracted_text":extracted_text}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 # Route to get video embeddings @cache.cached(timeout=300) @app.route('/extract_video_text_and_embedding', methods=['POST']) def get_video_embedding_route(): try: video_url = request.json.get("videoUrl") audio_data = separate_audio_from_video(video_url) audio_embedding = extract_audio_embeddings(audio_data) audio_embedding_list = audio_embedding audio_file = io.BytesIO(audio_data) r = sr.Recognizer() with sr.AudioFile(audio_file) as source: audio_data = r.record(source) extracted_text = "" try: text = r.recognize_google(audio_data) extracted_text = text except Exception as e: print(e) video_embedding = get_video_embedding(video_url) return jsonify({"video_embedding": video_embedding,"extracted_audio_text": extracted_text, "audio_embedding": audio_embedding_list}), 200 except Exception as e: print(e) return jsonify({"error": str(e)}), 500 @cache.cached(timeout=300) @app.route('/extract_pdf_text_and_embedding', methods=['POST']) def extract_pdf_text_and_embedding(): try: pdf_url = request.json.get("pdfUrl") print(1) pages_info = seperate_image_text_from_pdf(pdf_url) content = pdf_image_text_embedding_and_text_embedding(pages_info) print(content) return jsonify({"content": content}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 # Route to get text description embeddings @cache.cached(timeout=300) @app.route('/getTextDescriptionEmbedding', methods=['POST']) def get_text_description_embedding_route(): try: text = request.json.get("text") text_description_embedding = get_text_discription_vector(text) return jsonify({"text_description_embedding": text_description_embedding.tolist()}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 # Route to get object detection results @cache.cached(timeout=300) @app.route('/detectObjects', methods=['POST']) def detect_objects_route(): try: image_url = request.json.get("imageUrl") response = requests.get(image_url) if response.status_code != 200: return jsonify({"error": "Failed to download image"}), 500 binary_data = response.content object_detection_results = detect_objects(binary_data) return jsonify({"object_detection_results": object_detection_results}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 # Route to get face locations @cache.cached(timeout=300) @app.route('/getFaceLocations', methods=['POST']) def get_face_locations_route(): try: image_url = request.json.get("imageUrl") response = requests.get(image_url) print(11) if response.status_code != 200: return jsonify({"error": "Failed to download image"}), 500 print(22) binary_data = response.content face_locations = get_face_locations(binary_data) print(33) print("ok",face_locations) return jsonify({"face_locations": str(face_locations)}), 200 except Exception as e: print(e) return jsonify({"error": str(e)}), 500 # Route to get similarity score @cache.cached(timeout=300) @app.route('/getSimilarityScore', methods=['POST']) def get_similarity_score_route(): try: embedding1 = request.json.get("embedding1") embedding2 = request.json.get("embedding2") # Assuming embeddings are provided as lists similarity_score = get_all_similarities(embedding1, embedding2) return jsonify({"similarity_score": similarity_score}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/') def hello(): return 'Hello, World!'