Spaces:
Running
Running
from os import listdir | |
from os.path import isfile, join | |
import html | |
import gradio as gr | |
import pipe | |
from io_utils import get_logs_file | |
LOG_PATH = "./tmp" | |
CONFIG_PATH = "./cicd/configs/" | |
MAX_FILES_NUM = 20 | |
def get_accordions_of_files(path, files): | |
components = [None for _ in range(0, MAX_FILES_NUM)] | |
for i in range(0, len(files)): | |
if i >= MAX_FILES_NUM: | |
break | |
with open(join(path, files[i]), "r") as f: | |
components[i] = f.read() | |
return components | |
def get_accordions_of_log_files(): | |
log_files = [ | |
f for f in listdir(LOG_PATH) if isfile(join(LOG_PATH, f)) and f.endswith("_log") | |
] | |
return get_accordions_of_files(LOG_PATH, log_files) | |
def get_accordions_of_config_files(): | |
config_files = [ | |
f | |
for f in listdir(CONFIG_PATH) | |
if isfile(join(CONFIG_PATH, f)) and f.endswith(".yaml") | |
] | |
return get_accordions_of_files(CONFIG_PATH, config_files) | |
def get_config_files(): | |
config_files = [ | |
join(CONFIG_PATH, f) | |
for f in listdir(CONFIG_PATH) | |
if isfile(join(CONFIG_PATH, f)) and f.endswith(".yaml") | |
] | |
return config_files | |
def get_jobs_info_in_queue(): | |
return [f"⌛️job id {html.escape(job[0])}: {html.escape(job[2])}<br/>" for job in pipe.jobs] | |
def get_queue_status(): | |
if len(pipe.jobs) > 0 or pipe.current is not None: | |
return f'<div style="padding-top: 5%">Current job: {html.escape(pipe.current)} <br/> Jobs in queue: <br/> {"".join(get_jobs_info_in_queue())}</div>' | |
else: | |
return '<div style="padding-top: 5%">No jobs in queue, please submit an evaluation task from another tab.</div>' | |
def get_demo(): | |
with gr.Row(): | |
gr.HTML( | |
value=get_queue_status, | |
every=5, | |
) | |
with gr.Accordion(label="Log Files", open=False): | |
gr.Textbox( | |
value=get_logs_file, every=0.5, lines=10, visible=True, label="Log File" | |
) | |
with gr.Accordion(label="Config Files", open=False): | |
gr.Files(value=get_config_files, label="Config Files", every=10) | |