Spaces:
Sleeping
Sleeping
import os | |
import logging | |
import json | |
from fastapi import FastAPI, UploadFile | |
from fastapi.responses import FileResponse | |
import gradio as gr | |
from PIL import Image | |
import PIL | |
import numpy as np | |
import pypdfium2 as pdfium | |
from ultralytics import YOLO | |
from ultralytics.engine.results import Results, Masks | |
import uvicorn | |
import cv2 | |
import uuid | |
from openai import OpenAI | |
logger = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.DEBUG) | |
PROMPT = """ | |
You are analyzing the spec sheet of a solar panel. If there is no text after the line \"PDF Extract Text Contents Below:\" report that there is no spec data provided as dictionary with a field called 'error'. | |
If there is text, please answer the following questions, format them as a JSON dictionary. Include the units of dimensions, weight, and cable lengths.\n | |
""" | |
# from solareyes.sam import SAM | |
client = OpenAI( | |
# This is the default and can be omitted | |
api_key=os.environ.get("OPENAI_API_KEY"), | |
) | |
app = FastAPI() | |
# Directories | |
image_dir = './pdf_images/' | |
cropped_dir = './output/' | |
pdf_dir = './pdf_downloads/' | |
os.makedirs(image_dir, exist_ok=True) | |
os.makedirs(cropped_dir, exist_ok=True) | |
os.makedirs(pdf_dir, exist_ok=True) | |
def parse_pdf_text(file): | |
pdf = pdfium.PdfDocument(file) | |
all_text = "PDF Extract Text Contents Below: \n\n" | |
for page in pdf: | |
textpage = page.get_textpage() | |
text_all = textpage.get_text_bounded() | |
all_text += text_all | |
logger.debug(f"Text: {all_text}") | |
#use openai to ask questions about text | |
q1 = "What are module dimensions in L x W x H? Result key should be \"module_dimensions\"" | |
q2 = "What is the module weight in kilograms? Result key should be \"module_weight\"" | |
q3 = "What are the cable lengths in millimeters? Result key should be \"cable_length\"" | |
q4 = "What brand, name, or model are the connectors? Result key should be \"connector\"" | |
q5 = "How many pieces per container? Prefer 40' HQ or HC, if not available try 53' Result key should be \"pieces_per_container\"" | |
q6 = "What is the model number? Result key should be \"model_number\"" | |
question = PROMPT + q1 + "\n" + q2 + "\n" + q3 + "\n" + q4 + "\n" + q5 + "\n" + q6 + "\n" + all_text | |
chat_completion = client.chat.completions.create( | |
messages=[ | |
{ | |
"role": "user", | |
"content": question, | |
} | |
], | |
model="gpt-3.5-turbo", | |
response_format={ "type": "json_object"} | |
) | |
return chat_completion.choices[0].message.content | |
def segment_solar_panel(image) -> Results: | |
# Perform inference | |
seg_model: YOLO = YOLO('model/autodistill_best_seg.pt') | |
results: Results = seg_model.predict(image, imgsz=(841, 595), retina_masks=True) | |
return results | |
def resize_and_pad(subject_image: Image.Image): | |
# Resize subject image to 80% of 1200px while maintaining aspect ratio | |
target_height = int(1200 * 0.8) | |
aspect_ratio = subject_image.width / subject_image.height | |
new_width = int(target_height * aspect_ratio) | |
resized_subject = subject_image.resize((new_width, target_height), Image.LANCZOS) | |
# Create a new transparent image | |
new_image = Image.new("RGBA", (1200, 1200), (0, 0, 0, 0)) | |
# Calculate the position to paste the resized subject image | |
x = (1200 - new_width) // 2 | |
y = (1200 - target_height) // 2 | |
# Paste the resized subject image onto the transparent image | |
new_image.paste(resized_subject, (x, y), resized_subject) | |
# Save or return the PNG image | |
png_image = new_image | |
# Create a new image with a white background | |
jpg_image = Image.new("RGB", (1200, 1200), (255, 255, 255)) | |
jpg_image.paste(png_image, (0, 0), png_image) | |
# Save or return the JPEG image | |
return png_image, jpg_image | |
def segment_image_core(img: np.ndarray | Image.Image) -> Image.Image: | |
if type(img) is np.ndarray: | |
img = Image.fromarray(img) | |
results = segment_solar_panel(img) | |
sections = [] | |
for i, result in enumerate(results): | |
print(f"Result {i}") | |
result: Results | |
try: | |
h2, w2, c2 = result.orig_img.shape | |
# Deal with boxes | |
i = 0 | |
for box in result.boxes: | |
x1, y1, x2, y2 = box.xyxy[0].tolist() | |
sections.append(((int(x1), int(y1), int(x2), int(y2)), f"{section_labels[0]} Bounding Box - index {i} - conf {box.conf}")) | |
# Now the masks | |
masks: Masks = result.masks | |
try: | |
mask = masks[i] | |
cpu_mask = mask.cpu() | |
squeezed_mask = cpu_mask.data.numpy() | |
transposed_mask = squeezed_mask.transpose(1, 2, 0) | |
kernel = cv2.getStructuringElement(cv2.MORPH_OPEN, (11, 11)) | |
opened_mask = cv2.morphologyEx(transposed_mask, cv2.MORPH_OPEN, kernel, iterations=3) | |
cv_mask = cv2.resize(opened_mask, (w2, h2)) | |
image_mask = Image.fromarray((cv_mask * 255).astype(np.uint8)).filter(PIL.ImageFilter.GaussianBlur(1)) | |
img_out = img.copy() | |
img_out.putalpha(image_mask) | |
img_out = img_out.crop((x1, y1, x2, y2)) | |
png_img, jpg_img = resize_and_pad(img_out) | |
sections.append((cv_mask, f"{section_labels[0]} Mask - Index: {i}")) | |
except TypeError as e: | |
print(f"Error processing image: {e}, probably no masks.") | |
i += 1 | |
except IndexError as e: | |
print(f"Error processing image: {e}, probably no boxes.") | |
return (img, sections), jpg_img | |
def pdf_to_image(pdf, end = None, start = 0) -> list[Image.Image]: | |
pdf = pdfium.PdfDocument(pdf) | |
page_images = [] | |
if end is None: | |
end = len(pdf) # get the number of pages in the document | |
for i in range(start, end): | |
page = pdf[i] | |
page_image = page.render(scale=4).to_pil() | |
page_images.append(page_image) | |
return page_images | |
def pdf_first_page_to_image(pdf) -> Image.Image: | |
return pdf_to_image(pdf, 1, 0)[0] | |
with gr.Blocks() as demo: | |
section_labels = ['Solar Panel'] | |
def segment_image(img): | |
img_sections, jpg_img = segment_image_core(img) | |
return img_sections | |
def process_pdf(pdf): | |
image = pdf_first_page_to_image(pdf) | |
return segment_image(image) | |
pdf_input = gr.File(label="Upload PDF", file_types=['pdf'], height=100) | |
pdf_image = gr.Gallery(label="PDF Page Images") | |
pdf_to_image_btn = gr.Button("Convert PDF to Image") | |
with gr.Row(): | |
img_output_pdf = gr.AnnotatedImage(label="Extracted product image", height=400) | |
pdf_extract_btn = gr.Button("Identify Solar Panel from PDF") | |
with gr.Row(): | |
text_input = gr.Textbox(label="Enter Text", placeholder=PROMPT) | |
text_output = gr.Textbox(label="Output", placeholder="Spec analysis will appear here") | |
pdf_text_btn = gr.Button("Extract specs from PDF Text") | |
gr.Examples( | |
inputs = pdf_input, | |
examples = [os.path.join(pdf_dir, file) for file in os.listdir(pdf_dir)], | |
) | |
pdf_extract_btn.click(process_pdf, [pdf_input], img_output_pdf) | |
pdf_text_btn.click(parse_pdf_text, [pdf_input], text_output) | |
pdf_to_image_btn.click(pdf_to_image, [pdf_input], pdf_image) | |
#Accept a PDF file, return a jpeg image | |
def extract_image(uploadFile: UploadFile) -> FileResponse: | |
file = uploadFile.file.read() | |
image = pdf_first_page_to_image(file) | |
img_segments, jpeg_image = segment_image_core(image) | |
id = str(uuid.uuid4()) | |
filename = f"{cropped_dir}/cropped_{id}.jpg" | |
jpeg_image.save(filename) | |
return FileResponse(filename) | |
#Accept a PDF file, return a text summary | |
def parse_info(uploadFile: UploadFile): | |
logger.info(f"Receiving file {uploadFile.filename}") | |
file = uploadFile.file.read() | |
logger.info(f"Received file {uploadFile.filename}") | |
answer = parse_pdf_text(file) | |
return {"answer": json.loads(answer)} | |
app = gr.mount_gradio_app(app, demo, path="/") | |
if __name__ == "__main__": | |
# app = gr.mount_gradio_app(app, demo, path="/gradio") | |
uvicorn.run(app, port=7860) | |
# demo.launch(share=True) | |
# demo.launch(share=True, auth=(os.environ.get("GRADIO_USERNAME"), os.environ.get("GRADIO_PASSWORD"))) |