Spaces:
Running
Running
import os | |
import cv2 | |
import smtplib | |
import gradio as gr | |
from email import encoders | |
from ultralytics import YOLO | |
from email.mime.text import MIMEText | |
from email.mime.base import MIMEBase | |
from email.mime.multipart import MIMEMultipart | |
sender_email = os.environ.get("sender_email") | |
receiver_email = os.environ.get("receiver_email") | |
sender_password = os.environ.get("sender_password") | |
smtp_port = 8080 | |
smtp_server = "smtp.gmail.com" | |
subject = "Accident detected" | |
def send_email(accident_type,image): | |
body = accident_type | |
msg = MIMEMultipart() | |
msg['From'] = sender_email | |
msg['To'] = receiver_email | |
msg['Subject'] = subject | |
msg.attach(MIMEText(body, 'plain')) | |
is_success, buffer = cv2.imencode(".jpg", image) | |
attachment = buffer.tobytes() | |
attachment_package = MIMEBase('application', 'octet-stream') | |
attachment_package.set_payload((attachment) | |
encoders.encode_base64(attachment_package) | |
attachment_package.add_header('Content-Disposition', "attachment; filename= " + filename) | |
msg.attach(attachment_package) | |
text = msg.as_string() | |
print("Connecting to server") | |
gmail_server = smtplib.SMTP(smtp_server, smtp_port) | |
gmail_server.starttls() | |
gmail_server.login(sender_email, sender_password) | |
print("Successfully Connected to Server") | |
print("Sending email to ", receiver_email) | |
gmail_server.sendmail(sender_email, receiver_email, text) | |
print("Email sent to ", receiver_email) | |
gmail_server.quit() | |
def check_acc(box): | |
res_index_list = box.cls.tolist() | |
result = "" | |
for index in res_index_list: | |
if index == 1: | |
result = "Bike Bike Accident Detected" | |
break | |
elif index == 2: | |
result = "Bike Object Accident Detected" | |
break | |
elif index == 3: | |
result = "Bike Person Accident Detected" | |
break | |
elif index == 5: | |
result = "Car Bike Accident Detected" | |
break | |
elif index == 6: | |
result = "Car Car Accident Detected" | |
break | |
elif index == 7: | |
result = "Car Object Accident Detected" | |
break | |
elif index == 8: | |
result = "Car Person Accident Detected" | |
break | |
return result | |
def image_predict(image): | |
res = "" | |
model_path = "best.pt" | |
model = YOLO(model_path) | |
results = model.predict(image,conf = 0.6,iou = 0.3,imgsz = 512) | |
box = results[0].boxes | |
res = check_acc(box) | |
annotated_frame = results[0].plot() | |
if len(res) >0: | |
annotated_frame_bgr = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR) | |
send_email(res, annotated_frame_bgr) | |
return (res, annotated_frame) | |
return ("No Class Detected", None) | |
def extract_frames(video): | |
vidcap = cv2.VideoCapture(video) | |
vidcap = cv2.VideoCapture(video) | |
fps = vidcap.get(cv2.CAP_PROP_FPS) | |
nof = 4 | |
frame_no = 0 | |
while vidcap.isOpened(): | |
res = "" | |
render = None | |
success, image = vidcap.read() | |
if success ==False: | |
break | |
# Check if it's time to process the frame based on the desired interval | |
if (frame_no % (int(fps / nof))) == 0: | |
model_path = "best.pt" | |
model = YOLO(model_path) | |
results = model.predict(image,conf = 0.6,iou = 0.3,imgsz = 512) | |
box = results[0].boxes | |
res = check_acc(box) | |
annotated_frame = results[0].plot() | |
if len(res) >0: | |
annotated_frame_bgr = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR) | |
send_email(res, annotated_frame_bgr) | |
return (res, annotated_frame) | |
frame_no += 1 # Increment frame number | |
return ("No Class Detected", None) | |
def take_input(image, video): | |
if(video != None): | |
res = extract_frames(video) | |
else: | |
res = image_predict(image) | |
return res | |
with gr.Blocks(title="YOLOS Object Detection", css=".gradio-container {background:lightyellow;}") as demo: | |
gr.HTML('<h1>Accident Detection Using Yolov8</h1>') | |
gr.HTML("<br>") | |
with gr.Row(): | |
input_image = gr.Image(label="Input image") | |
input_video = gr.Video(label="Input video") | |
output_label = gr.Text(label="output label") | |
output_image = gr.Image(label="Output image") | |
gr.HTML("<br>") | |
send_btn = gr.Button("Detect") | |
gr.HTML("<br>") | |
send_btn.click(fn=take_input, inputs=[input_image, input_video], outputs=[output_label, output_image]) | |
demo.launch(debug=True) | |