radames's picture
Upload 3 files
06d3d8c
import os
import sys
import threading
import time
import tkinter as tk
from multiprocessing import Queue
from typing import List
from PIL import Image, ImageTk
from streamdiffusion.image_utils import postprocess_image
sys.path.append(os.path.join(os.path.dirname(__file__), "..", ".."))
def update_image(image_data: Image.Image, label: tk.Label) -> None:
"""
Update the image displayed on a Tkinter label.
Parameters
----------
image_data : Image.Image
The image to be displayed.
label : tk.Label
The labels where the image will be updated.
"""
width = 512
height = 512
tk_image = ImageTk.PhotoImage(image_data, size=width)
label.configure(image=tk_image, width=width, height=height)
label.image = tk_image # keep a reference
def _receive_images(
queue: Queue, fps_queue: Queue, label: tk.Label, fps_label: tk.Label
) -> None:
"""
Continuously receive images from a queue and update the labels.
Parameters
----------
queue : Queue
The queue to receive images from.
fps_queue : Queue
The queue to put the calculated fps.
label : tk.Label
The label to update with images.
fps_label : tk.Label
The label to show fps.
"""
while True:
try:
if not queue.empty():
label.after(
0,
update_image,
postprocess_image(queue.get(block=False), output_type="pil")[0],
label,
)
if not fps_queue.empty():
fps_label.config(text=f"FPS: {fps_queue.get(block=False):.2f}")
time.sleep(0.0005)
except KeyboardInterrupt:
return
def receive_images(queue: Queue, fps_queue: Queue) -> None:
"""
Setup the Tkinter window and start the thread to receive images.
Parameters
----------
queue : Queue
The queue to receive images from.
fps_queue : Queue
The queue to put the calculated fps.
"""
root = tk.Tk()
root.title("Image Viewer")
label = tk.Label(root)
fps_label = tk.Label(root, text="FPS: 0")
label.grid(column=0)
fps_label.grid(column=1)
def on_closing():
print("window closed")
root.quit() # stop event loop
return
thread = threading.Thread(
target=_receive_images, args=(queue, fps_queue, label, fps_label), daemon=True
)
thread.start()
try:
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
except KeyboardInterrupt:
return