Spaces:
Sleeping
Sleeping
import moviepy.editor as mp | |
from flask import Flask, request, jsonify | |
from flask_cors import CORS | |
import requests | |
from io import BytesIO | |
import speech_recognition as sr | |
import io | |
import fitz # PyMuPDF for working with PDFs | |
import numpy as np | |
import cv2 | |
from flask_caching import Cache | |
from utils.audioEmbedding.index import extract_audio_embeddings | |
from utils.videoEmbedding.index import get_video_embedding | |
from utils.imageToText.index import extract_text | |
from utils.sentanceEmbedding.index import get_text_vector , get_text_discription_vector | |
from utils.imageEmbedding.index import get_image_embedding | |
from utils.similarityScore import get_all_similarities | |
from utils.objectDetection.index import detect_objects | |
app = Flask(__name__) | |
cache = Cache(app, config={'CACHE_TYPE': 'simple'}) # You can choose a caching type based on your requirements | |
CORS(app) | |
import moviepy.editor as mp | |
import tempfile | |
def get_face_locations(binary_data): | |
# Convert binary image data to numpy array | |
print(1) | |
nparr = np.frombuffer(binary_data, np.uint8) | |
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
# Load the pre-trained face detection model | |
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
# Convert the image to grayscale | |
gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
# Detect faces in the image | |
faces = face_cascade.detectMultiScale(gray_image, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) | |
# Extract face locations | |
print(2) | |
face_locations = [] | |
for (x, y, w, h) in faces: | |
face_locations.append({"top": y, "right": x + w, "bottom": y + h, "left": x}) | |
print(3) | |
return face_locations | |
def seperate_image_text_from_pdf(pdf_url): | |
# List to store page information | |
try: | |
pages_info = [] | |
# Fetch the PDF from the URL | |
response = requests.get(pdf_url) | |
if response.status_code == 200: | |
# Create a temporary file to save the PDF data | |
with tempfile.NamedTemporaryFile(delete=False) as tmp_file: | |
tmp_file.write(response.content) | |
tmp_file_path = tmp_file.name | |
# Open the PDF | |
pdf = fitz.open(tmp_file_path) | |
# Iterate through each page | |
for page_num in range(len(pdf)): | |
page = pdf.load_page(page_num) | |
# Extract text | |
text = page.get_text() | |
# Count images | |
image_list = page.get_images(full=True) | |
# Convert images to BytesIO and store in a list | |
images_bytes = [] | |
for img_index, img_info in enumerate(image_list): | |
xref = img_info[0] | |
base_image = pdf.extract_image(xref) | |
image_bytes = base_image["image"] | |
images_bytes.append(image_bytes) | |
# Store page information in a dictionary | |
page_info = { | |
"pgno": page_num + 1, | |
"images": images_bytes, | |
"text": text | |
} | |
# Append page information to the list | |
pages_info.append(page_info) | |
# Close the PDF | |
pdf.close() | |
# Clean up the temporary file | |
import os | |
os.unlink(tmp_file_path) | |
else: | |
print("Failed to fetch the PDF from the URL.") | |
except Exception as e: | |
print("An error occurred:", e) | |
return "Error" | |
return pages_info | |
def pdf_image_text_embedding_and_text_embedding(pages_info): | |
try: | |
page_embeddings = [] | |
# Iterate through each page | |
for page in pages_info: | |
# Extract text from the page | |
text = page.get("text", "") | |
images = page.get("images", []) | |
image_embeddings = [] | |
for image in images: | |
try: | |
image_embedding = get_image_embedding(image) | |
extracted_text = extract_text(image) | |
image_embeddings.append({"image_embedding": image_embedding.tolist(), "extracted_text": extracted_text}) | |
except Exception as image_error: | |
print(f"Error processing image: {image_error}") | |
# Log the error or handle it as needed | |
# Store the page embeddings in a dictionary | |
page_embedding = { | |
"images": image_embeddings, | |
"text": text, | |
} | |
page_embeddings.append(page_embedding) | |
return page_embeddings | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
return "Error" | |
def separate_audio_from_video(video_url): | |
try: | |
# Load the video file | |
video = mp.VideoFileClip(video_url) | |
# Extract audio | |
audio = video.audio | |
# Create a temporary file to write the audio data | |
try : | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: | |
temp_audio_filename = temp_audio_file.name | |
# Write the audio data to the temporary file | |
audio.write_audiofile(temp_audio_filename) | |
# Read the audio data from the temporary file as bytes | |
with open(temp_audio_filename, "rb") as f: | |
audio_bytes = f.read() | |
except Exception as e: | |
return "Error" | |
return audio_bytes | |
except Exception as e: | |
print("An error occurred:", e) | |
return "Error" | |
def get_text_embedding_route(): | |
try: | |
text = request.json.get("text") | |
text_embedding = get_text_vector(text) | |
return jsonify({"text_embedding": text_embedding}), 200 | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
def get_audio_embedding_route(): | |
audio_url = request.json.get('audio_url') | |
print(audio_url) | |
response = requests.get(audio_url) | |
audio_data = response.content | |
audio_embedding = extract_audio_embeddings(audio_data) | |
audio_embedding_list = audio_embedding | |
audio_file = BytesIO(audio_data) | |
r = sr.Recognizer() | |
with sr.AudioFile(audio_file) as source: | |
audio_data = r.record(source) | |
extracted_text = "" | |
try: | |
text = r.recognize_google(audio_data) | |
extracted_text = text | |
except Exception as e: | |
print(e) | |
return jsonify({"extracted_text": extracted_text, "audio_embedding": audio_embedding_list}), 200 | |
# Route to get image embeddings | |
def get_image_embedding_route(): | |
try: | |
image_url = request.json.get("imageUrl") | |
print(image_url) | |
response = requests.get(image_url) | |
if response.status_code != 200: | |
return jsonify({"error": "Failed to download image"}), 500 | |
binary_data = response.content | |
extracted_text = extract_text(binary_data) | |
image_embedding = get_image_embedding(binary_data) | |
image_embedding_list = image_embedding.tolist() | |
return jsonify({"image_embedding": image_embedding_list,"extracted_text":extracted_text}), 200 | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
# Route to get video embeddings | |
def get_video_embedding_route(): | |
try: | |
video_url = request.json.get("videoUrl") | |
try: | |
audio_data = separate_audio_from_video(video_url) | |
except Exception as e: | |
return jsonify({"error": "Failed to extract audio from video 1"}), 500 | |
try: | |
audio_embedding = extract_audio_embeddings(audio_data) | |
except Exception as e: | |
return jsonify({"error": "Failed to extract audio embeddings 2 "+e}), 500 | |
audio_embedding_list = audio_embedding | |
try : | |
audio_file = io.BytesIO(audio_data) | |
except Exception as e: | |
return jsonify({"error": "Failed to extract audio embeddings 3"}), 500 | |
try : | |
r = sr.Recognizer() | |
with sr.AudioFile(audio_file) as source: | |
audio_data = r.record(source) | |
except Exception as e: | |
return jsonify({"error": "Failed to extract audio embeddings 4"}), 500 | |
extracted_text = "" | |
try: | |
text = r.recognize_google(audio_data) | |
extracted_text = text | |
except Exception as e: | |
print(e) | |
video_embedding = get_video_embedding(video_url) | |
return jsonify({"video_embedding": video_embedding,"extracted_audio_text": extracted_text, "audio_embedding": audio_embedding_list}), 200 | |
except Exception as e: | |
print(e) | |
return jsonify({"error": str(e)}), 500 | |
def extract_pdf_text_and_embedding(): | |
list = [] | |
try: | |
list.append(1) | |
pdf_url = request.json.get("pdfUrl") | |
list.append(2) | |
print(1) | |
pages_info = "Error" | |
try : | |
pages_info = seperate_image_text_from_pdf(pdf_url) | |
except Exception as e: | |
print(e) | |
return jsonify({"error": "Failed to fetch the PDF from the URL"}), 500 | |
list.append(3) | |
if(pages_info == "Error"): | |
return jsonify({"error": "Failed to fetch the PDF from the URL seperate_image_text_from_pdf "}), 500 | |
list.append(4) | |
print(pages_info) | |
try: | |
content = pdf_image_text_embedding_and_text_embedding(pages_info) | |
except Exception as e1: | |
print(e1) | |
return jsonify({"error": "An error occurred 1 while processing the PDF"}), 500 | |
if content == "Error": | |
return jsonify({"error": "An error occurred 2 while processing the PDF"}), 500 | |
list.append(5) | |
print(content) | |
return jsonify({"content": content}), 200 | |
except Exception as e: | |
print(e) | |
return jsonify({"error": str(list)}), 500 | |
finally: | |
print("kasi",list) | |
# Route to get text description embeddings | |
def get_text_description_embedding_route(): | |
try: | |
text = request.json.get("text") | |
text_description_embedding = get_text_discription_vector(text) | |
return jsonify({"text_description_embedding": text_description_embedding.tolist()}), 200 | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
# Route to get object detection results | |
def detect_objects_route(): | |
try: | |
image_url = request.json.get("imageUrl") | |
response = requests.get(image_url) | |
if response.status_code != 200: | |
return jsonify({"error": "Failed to download image"}), 500 | |
binary_data = response.content | |
object_detection_results = detect_objects(binary_data) | |
return jsonify({"object_detection_results": object_detection_results}), 200 | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
# Route to get face locations | |
def get_face_locations_route(): | |
try: | |
image_url = request.json.get("imageUrl") | |
response = requests.get(image_url) | |
print(11) | |
if response.status_code != 200: | |
return jsonify({"error": "Failed to download image"}), 500 | |
print(22) | |
binary_data = response.content | |
face_locations = get_face_locations(binary_data) | |
print(33) | |
print("ok",face_locations) | |
return jsonify({"face_locations": str(face_locations)}), 200 | |
except Exception as e: | |
print(e) | |
return jsonify({"error": str(e)}), 500 | |
# Route to get similarity score | |
def get_similarity_score_route(): | |
try: | |
embedding1 = request.json.get("embedding1") | |
embedding2 = request.json.get("embedding2") | |
# Assuming embeddings are provided as lists | |
similarity_score = get_all_similarities(embedding1, embedding2) | |
return jsonify({"similarity_score": similarity_score}), 200 | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 |