hprasath's picture
Upload 2 files
95760fd verified
raw
history blame
10.9 kB
import moviepy.editor as mp
from flask import Flask, request, jsonify
from flask_cors import CORS
import requests
from io import BytesIO
import speech_recognition as sr
import io
import fitz # PyMuPDF for working with PDFs
import numpy as np
import cv2
from flask_caching import Cache
from utils.audioEmbedding.index import extract_audio_embeddings
from utils.videoEmbedding.index import get_video_embedding
from utils.imageToText.index import extract_text
from utils.sentanceEmbedding.index import get_text_vector , get_text_discription_vector
from utils.imageEmbedding.index import get_image_embedding
from utils.similarityScore import get_all_similarities
from utils.objectDetection.index import detect_objects
app = Flask(__name__)
cache = Cache(app, config={'CACHE_TYPE': 'simple'}) # You can choose a caching type based on your requirements
CORS(app)
import moviepy.editor as mp
import tempfile
def get_face_locations(binary_data):
# Convert binary image data to numpy array
print(1)
nparr = np.frombuffer(binary_data, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# Load the pre-trained face detection model
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
# Convert the image to grayscale
gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Detect faces in the image
faces = face_cascade.detectMultiScale(gray_image, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
# Extract face locations
print(2)
face_locations = []
for (x, y, w, h) in faces:
face_locations.append({"top": y, "right": x + w, "bottom": y + h, "left": x})
print(3)
return face_locations
def seperate_image_text_from_pdf(pdf_url):
# List to store page information
pages_info = []
# Fetch the PDF from the URL
response = requests.get(pdf_url)
if response.status_code == 200:
# Create a temporary file to save the PDF data
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_file.write(response.content)
tmp_file_path = tmp_file.name
# Open the PDF
pdf = fitz.open(tmp_file_path)
# Iterate through each page
for page_num in range(len(pdf)):
page = pdf.load_page(page_num)
# Extract text
text = page.get_text()
# Count images
image_list = page.get_images(full=True)
# Convert images to BytesIO and store in a list
images_bytes = []
for img_index, img_info in enumerate(image_list):
xref = img_info[0]
base_image = pdf.extract_image(xref)
image_bytes = base_image["image"]
images_bytes.append(image_bytes)
# Store page information in a dictionary
page_info = {
"pgno": page_num + 1,
"images": images_bytes,
"text": text
}
# Append page information to the list
pages_info.append(page_info)
# Close the PDF
pdf.close()
# Clean up the temporary file
import os
os.unlink(tmp_file_path)
else:
print("Failed to fetch the PDF from the URL.")
return pages_info
def pdf_image_text_embedding_and_text_embedding(pages_info):
# List to store page embeddings
page_embeddings = []
# Iterate through each page
for page in pages_info:
# Extract text from the page
text = page["text"]
# Extract images from the page
images = page["images"]
# List to store image embeddings
image_embeddings = []
# Iterate through each image
for image in images:
# Get the image embedding
image_embedding = get_image_embedding(image)
extracted_text = extract_text(image)
# Append the image embedding to the list
image_embeddings.append({"image_embedding": image_embedding.tolist() ,"extracted_text":extracted_text})
# Get the text embedding
# Store the page embeddings in a dictionary
page_embedding = {
"images": image_embeddings,
"text": text,
}
# Append the page embedding to the list
page_embeddings.append(page_embedding)
return page_embeddings
def separate_audio_from_video(video_url):
try:
# Load the video file
video = mp.VideoFileClip(video_url)
# Extract audio
audio = video.audio
# Create a temporary file to write the audio data
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file:
temp_audio_filename = temp_audio_file.name
# Write the audio data to the temporary file
audio.write_audiofile(temp_audio_filename)
# Read the audio data from the temporary file as bytes
with open(temp_audio_filename, "rb") as f:
audio_bytes = f.read()
return audio_bytes
except Exception as e:
print("An error occurred:", e)
@cache.cached(timeout=300)
@app.route('/get_text_embedding', methods=['POST'])
def get_text_embedding_route():
try:
text = request.json.get("text")
text_embedding = get_text_vector(text)
return jsonify({"text_embedding": text_embedding}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
@cache.cached(timeout=300)
@app.route('/extract_audio_text_and_embedding', methods=['POST'])
def get_audio_embedding_route():
audio_url = request.json.get('audio_url')
print(audio_url)
response = requests.get(audio_url)
audio_data = response.content
audio_embedding = extract_audio_embeddings(audio_data)
audio_embedding_list = audio_embedding
audio_file = BytesIO(audio_data)
r = sr.Recognizer()
with sr.AudioFile(audio_file) as source:
audio_data = r.record(source)
extracted_text = ""
try:
text = r.recognize_google(audio_data)
extracted_text = text
except Exception as e:
print(e)
return jsonify({"extracted_text": extracted_text, "audio_embedding": audio_embedding_list}), 200
# Route to get image embeddings
@cache.cached(timeout=300)
@app.route('/extract_image_text_and_embedding', methods=['POST'])
def get_image_embedding_route():
try:
image_url = request.json.get("imageUrl")
print(image_url)
response = requests.get(image_url)
if response.status_code != 200:
return jsonify({"error": "Failed to download image"}), 500
binary_data = response.content
extracted_text = extract_text(binary_data)
image_embedding = get_image_embedding(binary_data)
image_embedding_list = image_embedding.tolist()
return jsonify({"image_embedding": image_embedding_list,"extracted_text":extracted_text}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
# Route to get video embeddings
@cache.cached(timeout=300)
@app.route('/extract_video_text_and_embedding', methods=['POST'])
def get_video_embedding_route():
try:
video_url = request.json.get("videoUrl")
audio_data = separate_audio_from_video(video_url)
audio_embedding = extract_audio_embeddings(audio_data)
audio_embedding_list = audio_embedding
audio_file = io.BytesIO(audio_data)
r = sr.Recognizer()
with sr.AudioFile(audio_file) as source:
audio_data = r.record(source)
extracted_text = ""
try:
text = r.recognize_google(audio_data)
extracted_text = text
except Exception as e:
print(e)
video_embedding = get_video_embedding(video_url)
return jsonify({"video_embedding": video_embedding,"extracted_audio_text": extracted_text, "audio_embedding": audio_embedding_list}), 200
except Exception as e:
print(e)
return jsonify({"error": str(e)}), 500
@cache.cached(timeout=300)
@app.route('/extract_pdf_text_and_embedding', methods=['POST'])
def extract_pdf_text_and_embedding():
try:
pdf_url = request.json.get("pdfUrl")
print(1)
pages_info = seperate_image_text_from_pdf(pdf_url)
content = pdf_image_text_embedding_and_text_embedding(pages_info)
print(content)
return jsonify({"content": content}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
# Route to get text description embeddings
@cache.cached(timeout=300)
@app.route('/getTextDescriptionEmbedding', methods=['POST'])
def get_text_description_embedding_route():
try:
text = request.json.get("text")
text_description_embedding = get_text_discription_vector(text)
return jsonify({"text_description_embedding": text_description_embedding.tolist()}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
# Route to get object detection results
@cache.cached(timeout=300)
@app.route('/detectObjects', methods=['POST'])
def detect_objects_route():
try:
image_url = request.json.get("imageUrl")
response = requests.get(image_url)
if response.status_code != 200:
return jsonify({"error": "Failed to download image"}), 500
binary_data = response.content
object_detection_results = detect_objects(binary_data)
return jsonify({"object_detection_results": object_detection_results}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
# Route to get face locations
@cache.cached(timeout=300)
@app.route('/getFaceLocations', methods=['POST'])
def get_face_locations_route():
try:
image_url = request.json.get("imageUrl")
response = requests.get(image_url)
print(11)
if response.status_code != 200:
return jsonify({"error": "Failed to download image"}), 500
print(22)
binary_data = response.content
face_locations = get_face_locations(binary_data)
print(33)
print("ok",face_locations)
return jsonify({"face_locations": str(face_locations)}), 200
except Exception as e:
print(e)
return jsonify({"error": str(e)}), 500
# Route to get similarity score
@cache.cached(timeout=300)
@app.route('/getSimilarityScore', methods=['POST'])
def get_similarity_score_route():
try:
embedding1 = request.json.get("embedding1")
embedding2 = request.json.get("embedding2")
# Assuming embeddings are provided as lists
similarity_score = get_all_similarities(embedding1, embedding2)
return jsonify({"similarity_score": similarity_score}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/')
def hello():
return 'Hello, World!'