|
import streamlit as st |
|
import numpy as np |
|
import cv2 |
|
import tensorflow as tf |
|
from PIL import Image |
|
from keras.models import load_model |
|
from sklearn.preprocessing import LabelEncoder |
|
import pickle |
|
from keras_preprocessing.sequence import pad_sequences |
|
from keras.preprocessing.text import Tokenizer |
|
from sklearn.preprocessing import LabelEncoder |
|
from PIL import Image |
|
|
|
|
|
def label_smoothing(y_true,y_pred): |
|
|
|
return tf.keras.losses.binary_crossentropy(y_true,y_pred,label_smoothing=0.1) |
|
def sparse_cross_entropy(y_true, y_pred): |
|
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y_true, |
|
logits=y_pred) |
|
loss_mean = tf.reduce_mean(loss) |
|
return loss_mean |
|
model1 = load_model('densenet.h5',custom_objects={'label_smoothing': label_smoothing}) |
|
image_model_transfer=load_model("image_model_transfer.h5") |
|
decoder_model=load_model("Final_ISRO_DenseNet201_Epoch50.h5",custom_objects={'sparse_cross_entropy': sparse_cross_entropy}) |
|
|
|
class TokenizerWrap(Tokenizer): |
|
"""Wrap the Tokenizer-class from Keras with more functionality.""" |
|
|
|
def _init_(self, texts, num_words=None): |
|
""" |
|
:param texts: List of strings with the data-set. |
|
:param num_words: Max number of words to use. |
|
""" |
|
|
|
Tokenizer._init_(self, num_words=num_words) |
|
|
|
|
|
self.fit_on_texts(texts) |
|
|
|
|
|
|
|
|
|
self.index_to_word = dict(zip(self.word_index.values(), |
|
self.word_index.keys())) |
|
|
|
def token_to_word(self, token): |
|
"""Lookup a single word from an integer-token.""" |
|
word = " " if token == 0 else self.index_to_word[token] |
|
return word |
|
|
|
def tokens_to_string(self, tokens): |
|
"""Convert a list of integer-tokens to a string.""" |
|
|
|
words = [self.index_to_word[token] |
|
for token in tokens |
|
if token != 0] |
|
|
|
|
|
|
|
text = " ".join(words) |
|
|
|
return text |
|
|
|
def captions_to_tokens(self, captions_listlist): |
|
""" |
|
Convert a list-of-list with text-captions to |
|
a list-of-list of integer-tokens. |
|
""" |
|
|
|
|
|
tokens = [self.texts_to_sequences(captions_list) |
|
for captions_list in captions_listlist] |
|
|
|
return tokens |
|
with open('Train_Label.pickle', 'rb') as efile: |
|
labels=pickle.load(efile) |
|
with open('tokenizer.pkl', 'rb') as efile: |
|
tokenizer=pickle.load(efile) |
|
|
|
le=LabelEncoder() |
|
labels=le.fit_transform(labels) |
|
|
|
def framing(video): |
|
fr = [] |
|
fr_pre=[] |
|
cap = cv2.VideoCapture(video) |
|
while (cap.isOpened()): |
|
ret,frame = cap.read() |
|
if ret == True: |
|
|
|
grayed = cv2.cvtColor(frame,cv2.COLOR_BGR2GRAY) |
|
canned = cv2.Canny(grayed,320,320) |
|
fr.append(frame) |
|
fr_pre.append(canned) |
|
|
|
|
|
k = cv2.waitKey(10) & 0XFF |
|
if k == ord('q'): |
|
break |
|
else: |
|
break |
|
cap.release() |
|
cv2.destroyAllWindows() |
|
return fr_pre,fr |
|
|
|
def difference_of_frames(frames): |
|
diff = [] |
|
for i in range(0,len(frames)-1): |
|
diff.append(cv2.absdiff(frames[i],frames[i+1])) |
|
return diff |
|
|
|
def cal_threshold(diff): |
|
mn = np.mean(diff) |
|
st_d = np.std(diff) |
|
a = 4 |
|
ts = mn + (a * st_d) |
|
return ts |
|
|
|
def imp_frames(diff, ts, ogframes): |
|
a_fr = [] |
|
for i in range(len(diff)): |
|
mn = np.mean(diff[i]) |
|
st_d = np.std(diff[i]) |
|
fr_ts = mn + (4*st_d) |
|
a_fr.append([i,fr_ts]) |
|
imp_fr = [] |
|
for i,ac_tr in(a_fr): |
|
if ac_tr >= ts: |
|
imp_fr.append([i,ac_tr]) |
|
key_fr = [] |
|
for i,_ in imp_fr: |
|
key_fr.append(ogframes[i]) |
|
return key_fr |
|
|
|
def final_image(video): |
|
frames,ogframes = framing(video) |
|
diff=difference_of_frames(frames) |
|
ts=cal_threshold(diff) |
|
key_fr=imp_frames(diff, ts, ogframes) |
|
frame_no=key_fr[int(len(key_fr)/2)] |
|
cv2.imwrite("Testing1.jpg",frame_no) |
|
return "Testing1.jpg" |
|
cv2.destroyAllWindows() |
|
|
|
def image_test(image_path): |
|
image=Image.open(image_path) |
|
image = image.resize((224,224)) |
|
image = np.array(image) |
|
image= np.expand_dims(image, axis=0) |
|
return image |
|
|
|
def largest_indices(ary, n): |
|
flat = ary.flatten() |
|
indices = np.argpartition(flat, -n)[-n:] |
|
indices = indices[np.argsort(-flat[indices])] |
|
return indices |
|
|
|
mark_start = 'ssss' |
|
mark_end = ' eeee' |
|
|
|
token_start = tokenizer.word_index[mark_start.strip()] |
|
token_end = tokenizer.word_index[mark_end.strip()] |
|
|
|
def load_image(path, size=None): |
|
""" |
|
Load the image from the given file-path and resize it |
|
to the given size if not None. |
|
""" |
|
|
|
|
|
img = Image.open(path) |
|
|
|
|
|
if not size is None: |
|
img = img.resize(size=size, resample=Image.LANCZOS) |
|
|
|
img = np.array(img) |
|
img = img / 255.0 |
|
|
|
|
|
if (len(img.shape) == 2): |
|
img = np.repeat(img[:, :, np.newaxis], 3, axis=2) |
|
return img |
|
|
|
def greedy_search(image_path, max_tokens=30): |
|
""" |
|
Generate a caption for the image in the given path. |
|
The caption is limited to the given number of tokens (words). |
|
""" |
|
|
|
|
|
image = load_image(image_path, size=(224,224)) |
|
|
|
|
|
|
|
|
|
image_batch = np.expand_dims(image, axis=0) |
|
|
|
|
|
|
|
transfer_values = image_model_transfer.predict(image_batch) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
shape = (1, max_tokens) |
|
decoder_input_data = np.zeros(shape=shape, dtype=int) |
|
|
|
|
|
token_int = token_start |
|
|
|
|
|
output_text = '' |
|
|
|
|
|
count_tokens = 0 |
|
|
|
|
|
|
|
while token_int != token_end and count_tokens < max_tokens: |
|
|
|
|
|
|
|
|
|
decoder_input_data[0, count_tokens] = token_int |
|
|
|
|
|
|
|
x_data = \ |
|
{ |
|
'transfer_values_input': transfer_values, |
|
'decoder_input': decoder_input_data |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
decoder_output = decoder_model.predict(x_data) |
|
|
|
|
|
|
|
|
|
|
|
token_onehot = decoder_output[0, count_tokens, :] |
|
|
|
|
|
token_int = np.argmax(token_onehot) |
|
|
|
|
|
|
|
sampled_word = tokenizer.token_to_word(token_int) |
|
|
|
|
|
|
|
output_text += " " + sampled_word |
|
|
|
|
|
count_tokens += 1 |
|
|
|
|
|
output_tokens = decoder_input_data[0] |
|
|
|
|
|
|
|
|
|
|
|
predicted_caption=output_text.split() |
|
del (predicted_caption[-1]) |
|
output_text = " " |
|
output_text = output_text.join(predicted_caption) |
|
|
|
|
|
|
|
|
|
|
|
return predicted_caption |
|
|
|
def beam_search(beam_index, image_path, max_tokens=30): |
|
image = load_image(image_path, size=(224,224)) |
|
|
|
|
|
|
|
|
|
image_batch = np.expand_dims(image, axis=0) |
|
|
|
|
|
|
|
transfer_values = image_model_transfer.predict(image_batch) |
|
|
|
token_int = [token_start] |
|
start_word = [[token_int, 0.0]] |
|
count_tokens = 0 |
|
while len(start_word[0][0])<max_tokens: |
|
temp = [] |
|
|
|
for s in start_word: |
|
par_caps = pad_sequences([s[0]], maxlen=max_tokens, padding='post') |
|
preds = decoder_model.predict([transfer_values,par_caps], verbose=0) |
|
token_onehot = preds[0, count_tokens, :] |
|
|
|
word_preds = np.argsort(token_onehot)[-beam_index:] |
|
|
|
|
|
for w in word_preds: |
|
next_cap, prob = s[0][:], s[1] |
|
next_cap.append(w) |
|
prob += token_onehot[w] |
|
temp.append([next_cap, prob]) |
|
|
|
start_word = temp |
|
count_tokens+=1 |
|
|
|
start_word = sorted(start_word, reverse=False, key=lambda l: l[1]) |
|
|
|
start_word = start_word[-beam_index:] |
|
|
|
start_word = start_word[-1][0] |
|
intermediate_caption = [tokenizer.token_to_word(i) for i in start_word] |
|
final_caption = [] |
|
|
|
for i in intermediate_caption: |
|
if i != 'eeee': |
|
final_caption.append(i) |
|
else: |
|
break |
|
|
|
|
|
return final_caption[1:] |
|
|
|
def generate_caption_any(image_path): |
|
predicted_caption1=' '.join((greedy_search(image_path=image_path))) |
|
predicted_caption2=' '.join(beam_search(beam_index=3,image_path=image_path)) |
|
predicted_caption3=' '.join(beam_search(beam_index=5,image_path=image_path)) |
|
return predicted_caption2 |
|
|
|
|
|
|
|
|
|
def main(): |
|
st.title("ISRO Video Classification & Captioning") |
|
st.write('In this project, we introduce a technique for video classification and captioning, harnessing a keyframe extraction method to streamline the process. Utilizing Densenet 201, our model is designed to classify videos by focusing on the most crucial frame, optimizing efficiency and performance. Users can experience our innovative approach by employing any of the provided three videos or by uploading additional ISRO footage to witness the improved model in action.') |
|
|
|
video_options = { |
|
"Video 1": "Video001-Scene-001.mp4", |
|
"Video 2": "Video015-Scene-074.mp4", |
|
"Video 3": "Video005-Scene-043.mp4", |
|
} |
|
|
|
selected_video = st.selectbox("Select a video to submit", list(video_options.keys())) |
|
video_path = video_options[selected_video] |
|
|
|
if st.button("Submit"): |
|
st.video(video_path) |
|
path=final_image(video_path) |
|
image=image_test(path) |
|
output_class=model1.predict(image) |
|
caption=generate_caption_any(path) |
|
indices=largest_indices(output_class, 3) |
|
st.title('The predicted category is:') |
|
st.write(le.inverse_transform(indices)[0]) |
|
st.title('Caption:') |
|
caption = caption.capitalize() |
|
st.write(caption) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|