GenAI-Arena / serve /vote_utils.py
tianleliphoebe's picture
update video generation
26dad4e
raw
history blame
39.9 kB
import datetime
import time
import json
import uuid
import gradio as gr
import regex as re
from pathlib import Path
from .utils import *
from .log_utils import build_logger
from .constants import IMAGE_DIR, VIDEO_DIR
import imageio
ig_logger = build_logger("gradio_web_server_image_generation", "gr_web_image_generation.log") # ig = image generation, loggers for single model direct chat
igm_logger = build_logger("gradio_web_server_image_generation_multi", "gr_web_image_generation_multi.log") # igm = image generation multi, loggers for side-by-side and battle
ie_logger = build_logger("gradio_web_server_image_editing", "gr_web_image_editing.log") # ie = image editing, loggers for single model direct chat
iem_logger = build_logger("gradio_web_server_image_editing_multi", "gr_web_image_editing_multi.log") # iem = image editing multi, loggers for side-by-side and battle
vg_logger = build_logger("gradio_web_server_video_generation", "gr_web_video_generation.log") # vg = video generation, loggers for single model direct chat
vgm_logger = build_logger("gradio_web_server_video_generation_multi", "gr_web_video_generation_multi.log") # vgm = video generation multi, loggers for side-by-side and battle
def vote_last_response_ig(state, vote_type, model_selector, request: gr.Request):
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(time.time(), 4),
"type": vote_type,
"model": model_selector,
"state": state.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
output_file = f'{IMAGE_DIR}/generation/{state.conv_id}.jpg'
with open(output_file, 'w') as f:
state.output.save(f, 'JPEG')
save_image_file_on_log_server(output_file)
def vote_last_response_igm(states, vote_type, model_selectors, request: gr.Request):
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(time.time(), 4),
"type": vote_type,
"models": [x for x in model_selectors],
"states": [x.dict() for x in states],
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
for state in states:
output_file = f'{IMAGE_DIR}/generation/{state.conv_id}.jpg'
with open(output_file, 'w') as f:
state.output.save(f, 'JPEG')
save_image_file_on_log_server(output_file)
def vote_last_response_ie(state, vote_type, model_selector, request: gr.Request):
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(time.time(), 4),
"type": vote_type,
"model": model_selector,
"state": state.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
output_file = f'{IMAGE_DIR}/edition/{state.conv_id}.jpg'
source_file = f'{IMAGE_DIR}/edition/{state.conv_id}_source.jpg'
with open(output_file, 'w') as f:
state.output.save(f, 'JPEG')
with open(source_file, 'w') as sf:
state.source_image.save(sf, 'JPEG')
save_image_file_on_log_server(output_file)
save_image_file_on_log_server(source_file)
def vote_last_response_iem(states, vote_type, model_selectors, request: gr.Request):
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(time.time(), 4),
"type": vote_type,
"models": [x for x in model_selectors],
"states": [x.dict() for x in states],
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
for state in states:
output_file = f'{IMAGE_DIR}/edition/{state.conv_id}.jpg'
source_file = f'{IMAGE_DIR}/edition/{state.conv_id}_source.jpg'
with open(output_file, 'w') as f:
state.output.save(f, 'JPEG')
with open(source_file, 'w') as sf:
state.source_image.save(sf, 'JPEG')
save_image_file_on_log_server(output_file)
save_image_file_on_log_server(source_file)
def vote_last_response_vg(state, vote_type, model_selector, request: gr.Request):
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(time.time(), 4),
"type": vote_type,
"model": model_selector,
"state": state.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
output_file = f'{VIDEO_DIR}/generation/{state.conv_id}.mp4'
os.makedirs(os.path.dirname(output_file), exist_ok=True)
if state.model_name.startswith('fal'):
r = requests.get(state.output)
with open(output_file, 'wb') as outfile:
outfile.write(r.content)
else:
print("======== video shape: ========")
print(state.output.shape)
imageio.mimwrite(output_file, state.output, fps=8, quality=9)
save_video_file_on_log_server(output_file)
def vote_last_response_vgm(states, vote_type, model_selectors, request: gr.Request):
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(time.time(), 4),
"type": vote_type,
"models": [x for x in model_selectors],
"states": [x.dict() for x in states],
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
for state in states:
output_file = f'{VIDEO_DIR}/generation/{state.conv_id}.mp4'
os.makedirs(os.path.dirname(output_file), exist_ok=True)
if state.model_name.startswith('fal'):
r = requests.get(state.output)
with open(output_file, 'wb') as outfile:
outfile.write(r.content)
else:
print("======== video shape: ========")
print(state.output.shape)
imageio.mimwrite(output_file, state.output, fps=8, quality=9)
save_video_file_on_log_server(output_file)
## Image Generation (IG) Single Model Direct Chat
def upvote_last_response_ig(state, model_selector, request: gr.Request):
ip = get_ip(request)
ig_logger.info(f"upvote. ip: {ip}")
vote_last_response_ig(state, "upvote", model_selector, request)
return ("",) + (disable_btn,) * 3
def downvote_last_response_ig(state, model_selector, request: gr.Request):
ip = get_ip(request)
ig_logger.info(f"downvote. ip: {ip}")
vote_last_response_ig(state, "downvote", model_selector, request)
return ("",) + (disable_btn,) * 3
def flag_last_response_ig(state, model_selector, request: gr.Request):
ip = get_ip(request)
ig_logger.info(f"flag. ip: {ip}")
vote_last_response_ig(state, "flag", model_selector, request)
return ("",) + (disable_btn,) * 3
## Image Generation Multi (IGM) Side-by-Side and Battle
def leftvote_last_response_igm(
state0, state1, model_selector0, model_selector1, request: gr.Request
):
igm_logger.info(f"leftvote (named). ip: {get_ip(request)}")
vote_last_response_igm(
[state0, state1], "leftvote", [model_selector0, model_selector1], request
)
return ("",) + (disable_btn,) * 4 + (gr.Markdown(f"### Model A: {state0.model_name}", visible=True), gr.Markdown(f"### Model B: {state1.model_name}", visible=True))
def rightvote_last_response_igm(
state0, state1, model_selector0, model_selector1, request: gr.Request
):
igm_logger.info(f"rightvote (named). ip: {get_ip(request)}")
vote_last_response_igm(
[state0, state1], "rightvote", [model_selector0, model_selector1], request
)
return ("",) + (disable_btn,) * 4 + (gr.Markdown(f"### Model A: {state0.model_name}", visible=True), gr.Markdown(f"### Model B: {state1.model_name}", visible=True))
def tievote_last_response_igm(
state0, state1, model_selector0, model_selector1, request: gr.Request
):
igm_logger.info(f"tievote (named). ip: {get_ip(request)}")
vote_last_response_igm(
[state0, state1], "tievote", [model_selector0, model_selector1], request
)
return ("",) + (disable_btn,) * 4 + (gr.Markdown(f"### Model A: {state0.model_name}", visible=True), gr.Markdown(f"### Model B: {state1.model_name}", visible=True))
def bothbad_vote_last_response_igm(
state0, state1, model_selector0, model_selector1, request: gr.Request
):
igm_logger.info(f"bothbad_vote (named). ip: {get_ip(request)}")
vote_last_response_igm(
[state0, state1], "bothbad_vote", [model_selector0, model_selector1], request
)
return ("",) + (disable_btn,) * 4 + (gr.Markdown(f"### Model A: {state0.model_name}", visible=True), gr.Markdown(f"### Model B: {state1.model_name}", visible=True))
## Image Editing (IE) Single Model Direct Chat
def upvote_last_response_ie(state, model_selector, request: gr.Request):
ip = get_ip(request)
ie_logger.info(f"upvote. ip: {ip}")
vote_last_response_ie(state, "upvote", model_selector, request)
return ("", "", gr.Image(height=512, width=512, type="pil"), "",) + (disable_btn,) * 3
def downvote_last_response_ie(state, model_selector, request: gr.Request):
ip = get_ip(request)
ie_logger.info(f"downvote. ip: {ip}")
vote_last_response_ie(state, "downvote", model_selector, request)
return ("", "", gr.Image(height=512, width=512, type="pil"), "",) + (disable_btn,) * 3
def flag_last_response_ie(state, model_selector, request: gr.Request):
ip = get_ip(request)
ie_logger.info(f"flag. ip: {ip}")
vote_last_response_ie(state, "flag", model_selector, request)
return ("", "", gr.Image(height=512, width=512, type="pil"), "",) + (disable_btn,) * 3
## Image Editing Multi (IEM) Side-by-Side and Battle
def leftvote_last_response_iem(
state0, state1, model_selector0, model_selector1, request: gr.Request
):
iem_logger.info(f"leftvote (anony). ip: {get_ip(request)}")
vote_last_response_iem(
[state0, state1], "leftvote", [model_selector0, model_selector1], request
)
# names = (
# "### Model A: " + state0.model_name,
# "### Model B: " + state1.model_name,
# )
# names = (state0.model_name, state1.model_name)
names = (gr.Markdown(f"### Model A: {state0.model_name}", visible=True), gr.Markdown(f"### Model B: {state1.model_name}", visible=True))
return names + ("", "", gr.Image(height=512, width=512, type="pil"), "") + (disable_btn,) * 4
def rightvote_last_response_iem(
state0, state1, model_selector0, model_selector1, request: gr.Request
):
iem_logger.info(f"rightvote (anony). ip: {get_ip(request)}")
vote_last_response_iem(
[state0, state1], "rightvote", [model_selector0, model_selector1], request
)
# names = (
# "### Model A: " + state0.model_name,
# "### Model B: " + state1.model_name,
# )
names = (gr.Markdown(f"### Model A: {state0.model_name}", visible=True), gr.Markdown(f"### Model B: {state1.model_name}", visible=True))
return names + ("", "", gr.Image(height=512, width=512, type="pil"), "") + (disable_btn,) * 4
def tievote_last_response_iem(
state0, state1, model_selector0, model_selector1, request: gr.Request
):
iem_logger.info(f"tievote (anony). ip: {get_ip(request)}")
vote_last_response_iem(
[state0, state1], "tievote", [model_selector0, model_selector1], request
)
names = (gr.Markdown(f"### Model A: {state0.model_name}", visible=True), gr.Markdown(f"### Model B: {state1.model_name}", visible=True))
return names + ("", "", gr.Image(height=512, width=512, type="pil"), "") + (disable_btn,) * 4
def bothbad_vote_last_response_iem(
state0, state1, model_selector0, model_selector1, request: gr.Request
):
iem_logger.info(f"bothbad_vote (anony). ip: {get_ip(request)}")
vote_last_response_iem(
[state0, state1], "bothbad_vote", [model_selector0, model_selector1], request
)
names = (gr.Markdown(f"### Model A: {state0.model_name}", visible=True), gr.Markdown(f"### Model B: {state1.model_name}", visible=True))
return names + ("", "", gr.Image(height=512, width=512, type="pil"), "") + (disable_btn,) * 4
## Video Generation (VG) Single Model Direct Chat
def upvote_last_response_vg(state, model_selector, request: gr.Request):
ip = get_ip(request)
vg_logger.info(f"upvote. ip: {ip}")
vote_last_response_vg(state, "upvote", model_selector, request)
return ("",) + (disable_btn,) * 3
def downvote_last_response_vg(state, model_selector, request: gr.Request):
ip = get_ip(request)
vg_logger.info(f"downvote. ip: {ip}")
vote_last_response_vg(state, "downvote", model_selector, request)
return ("",) + (disable_btn,) * 3
def flag_last_response_vg(state, model_selector, request: gr.Request):
ip = get_ip(request)
vg_logger.info(f"flag. ip: {ip}")
vote_last_response_vg(state, "flag", model_selector, request)
return ("",) + (disable_btn,) * 3
## Image Generation Multi (IGM) Side-by-Side and Battle
def leftvote_last_response_vgm(
state0, state1, model_selector0, model_selector1, request: gr.Request
):
vgm_logger.info(f"leftvote (named). ip: {get_ip(request)}")
vote_last_response_vgm(
[state0, state1], "leftvote", [model_selector0, model_selector1], request
)
return ("",) + (disable_btn,) * 4 + (gr.Markdown(f"### Model A: {state0.model_name}", visible=True), gr.Markdown(f"### Model B: {state1.model_name}", visible=True))
def rightvote_last_response_vgm(
state0, state1, model_selector0, model_selector1, request: gr.Request
):
vgm_logger.info(f"rightvote (named). ip: {get_ip(request)}")
vote_last_response_vgm(
[state0, state1], "rightvote", [model_selector0, model_selector1], request
)
return ("",) + (disable_btn,) * 4 + (gr.Markdown(f"### Model A: {state0.model_name}", visible=True), gr.Markdown(f"### Model B: {state1.model_name}", visible=True))
def tievote_last_response_vgm(
state0, state1, model_selector0, model_selector1, request: gr.Request
):
vgm_logger.info(f"tievote (named). ip: {get_ip(request)}")
vote_last_response_vgm(
[state0, state1], "tievote", [model_selector0, model_selector1], request
)
return ("",) + (disable_btn,) * 4 + (gr.Markdown(f"### Model A: {state0.model_name}", visible=True), gr.Markdown(f"### Model B: {state1.model_name}", visible=True))
def bothbad_vote_last_response_vgm(
state0, state1, model_selector0, model_selector1, request: gr.Request
):
vgm_logger.info(f"bothbad_vote (named). ip: {get_ip(request)}")
vote_last_response_vgm(
[state0, state1], "bothbad_vote", [model_selector0, model_selector1], request
)
return ("",) + (disable_btn,) * 4 + (gr.Markdown(f"### Model A: {state0.model_name}", visible=True), gr.Markdown(f"### Model B: {state1.model_name}", visible=True))
share_js = """
function (a, b, c, d) {
const captureElement = document.querySelector('#share-region-named');
html2canvas(captureElement)
.then(canvas => {
canvas.style.display = 'none'
document.body.appendChild(canvas)
return canvas
})
.then(canvas => {
const image = canvas.toDataURL('image/png')
const a = document.createElement('a')
a.setAttribute('download', 'chatbot-arena.png')
a.setAttribute('href', image)
a.click()
canvas.remove()
});
return [a, b, c, d];
}
"""
def share_click_igm(state0, state1, model_selector0, model_selector1, request: gr.Request):
igm_logger.info(f"share (anony). ip: {get_ip(request)}")
if state0 is not None and state1 is not None:
vote_last_response_igm(
[state0, state1], "share", [model_selector0, model_selector1], request
)
def share_click_iem(state0, state1, model_selector0, model_selector1, request: gr.Request):
iem_logger.info(f"share (anony). ip: {get_ip(request)}")
if state0 is not None and state1 is not None:
vote_last_response_iem(
[state0, state1], "share", [model_selector0, model_selector1], request
)
## All Generation Gradio Interface
class ImageStateIG:
def __init__(self, model_name):
self.conv_id = uuid.uuid4().hex
self.model_name = model_name
self.prompt = None
self.output = None
def dict(self):
base = {
"conv_id": self.conv_id,
"model_name": self.model_name,
"prompt": self.prompt
}
return base
class ImageStateIE:
def __init__(self, model_name):
self.conv_id = uuid.uuid4().hex
self.model_name = model_name
self.source_prompt = None
self.target_prompt = None
self.instruct_prompt = None
self.source_image = None
self.output = None
def dict(self):
base = {
"conv_id": self.conv_id,
"model_name": self.model_name,
"source_prompt": self.source_prompt,
"target_prompt": self.target_prompt,
"instruct_prompt": self.instruct_prompt
}
return base
class VideoStateVG:
def __init__(self, model_name):
self.conv_id = uuid.uuid4().hex
self.model_name = model_name
self.prompt = None
self.output = None
def dict(self):
base = {
"conv_id": self.conv_id,
"model_name": self.model_name,
"prompt": self.prompt
}
return base
def generate_ig(gen_func, state, text, model_name, request: gr.Request):
if not text:
raise gr.Warning("Prompt cannot be empty.")
if not model_name:
raise gr.Warning("Model name cannot be empty.")
if state is None:
state = ImageStateIG(model_name)
ip = get_ip(request)
ig_logger.info(f"generate. ip: {ip}")
start_tstamp = time.time()
generated_image = gen_func(text, model_name)
state.prompt = text
state.output = generated_image
state.model_name = model_name
yield state, generated_image
finish_tstamp = time.time()
# logger.info(f"===output===: {output}")
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
output_file = f'{IMAGE_DIR}/generation/{state.conv_id}.jpg'
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, 'w') as f:
state.output.save(f, 'JPEG')
save_image_file_on_log_server(output_file)
def generate_igm(gen_func, state0, state1, text, model_name0, model_name1, request: gr.Request):
if not text:
raise gr.Warning("Prompt cannot be empty.")
if not model_name0:
raise gr.Warning("Model name A cannot be empty.")
if not model_name1:
raise gr.Warning("Model name B cannot be empty.")
if state0 is None:
state0 = ImageStateIG(model_name0)
if state1 is None:
state1 = ImageStateIG(model_name1)
ip = get_ip(request)
igm_logger.info(f"generate. ip: {ip}")
start_tstamp = time.time()
# Remove ### Model (A|B): from model name
model_name0 = re.sub(r"### Model A: ", "", model_name0)
model_name1 = re.sub(r"### Model B: ", "", model_name1)
generated_image0, generated_image1 = gen_func(text, model_name0, model_name1)
state0.prompt = text
state1.prompt = text
state0.output = generated_image0
state1.output = generated_image1
state0.model_name = model_name0
state1.model_name = model_name1
yield state0, state1, generated_image0, generated_image1
finish_tstamp = time.time()
# logger.info(f"===output===: {output}")
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name0,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state0.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name1,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state1.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
for i, state in enumerate([state0, state1]):
output_file = f'{IMAGE_DIR}/generation/{state.conv_id}.jpg'
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, 'w') as f:
state.output.save(f, 'JPEG')
save_image_file_on_log_server(output_file)
def generate_igm_annoy(gen_func, state0, state1, text, model_name0, model_name1, request: gr.Request):
if not text:
raise gr.Warning("Prompt cannot be empty.")
if state0 is None:
state0 = ImageStateIG(model_name0)
if state1 is None:
state1 = ImageStateIG(model_name1)
ip = get_ip(request)
igm_logger.info(f"generate. ip: {ip}")
start_tstamp = time.time()
model_name0 = re.sub(r"### Model A: ", "", model_name0)
model_name1 = re.sub(r"### Model B: ", "", model_name1)
generated_image0, generated_image1, model_name0, model_name1 = gen_func(text, model_name0, model_name1)
state0.prompt = text
state1.prompt = text
state0.output = generated_image0
state1.output = generated_image1
state0.model_name = model_name0
state1.model_name = model_name1
yield state0, state1, generated_image0, generated_image1, \
gr.Markdown(f"### Model A: {model_name0}"), gr.Markdown(f"### Model B: {model_name1}")
finish_tstamp = time.time()
# logger.info(f"===output===: {output}")
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name0,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state0.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name1,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state1.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
for i, state in enumerate([state0, state1]):
output_file = f'{IMAGE_DIR}/generation/{state.conv_id}.jpg'
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, 'w') as f:
state.output.save(f, 'JPEG')
save_image_file_on_log_server(output_file)
def generate_ie(gen_func, state, source_text, target_text, instruct_text, source_image, model_name, request: gr.Request):
if not source_text:
raise gr.Warning("Source prompt cannot be empty.")
if not target_text:
raise gr.Warning("Target prompt cannot be empty.")
if not instruct_text:
raise gr.Warning("Instruction prompt cannot be empty.")
if not source_image:
raise gr.Warning("Source image cannot be empty.")
if not model_name:
raise gr.Warning("Model name cannot be empty.")
if state is None:
state = ImageStateIE(model_name)
ip = get_ip(request)
ig_logger.info(f"generate. ip: {ip}")
start_tstamp = time.time()
generated_image = gen_func(source_text, target_text, instruct_text, source_image, model_name)
state.source_prompt = source_text
state.target_prompt = target_text
state.instruct_prompt = instruct_text
state.source_image = source_image
state.output = generated_image
state.model_name = model_name
yield state, generated_image
finish_tstamp = time.time()
# logger.info(f"===output===: {output}")
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
src_img_file = f'{IMAGE_DIR}/edition/{state.conv_id}_src.jpg'
os.makedirs(os.path.dirname(src_img_file), exist_ok=True)
with open(src_img_file, 'w') as f:
state.source_image.save(f, 'JPEG')
output_file = f'{IMAGE_DIR}/edition/{state.conv_id}_out.jpg'
with open(output_file, 'w') as f:
state.output.save(f, 'JPEG')
save_image_file_on_log_server(src_img_file)
save_image_file_on_log_server(output_file)
def generate_iem(gen_func, state0, state1, source_text, target_text, instruct_text, source_image, model_name0, model_name1, request: gr.Request):
if not source_text:
raise gr.Warning("Source prompt cannot be empty.")
if not target_text:
raise gr.Warning("Target prompt cannot be empty.")
if not instruct_text:
raise gr.Warning("Instruction prompt cannot be empty.")
if not source_image:
raise gr.Warning("Source image cannot be empty.")
if not model_name0:
raise gr.Warning("Model name A cannot be empty.")
if not model_name1:
raise gr.Warning("Model name B cannot be empty.")
if state0 is None:
state0 = ImageStateIE(model_name0)
if state1 is None:
state1 = ImageStateIE(model_name1)
ip = get_ip(request)
igm_logger.info(f"generate. ip: {ip}")
start_tstamp = time.time()
model_name0 = re.sub(r"### Model A: ", "", model_name0)
model_name1 = re.sub(r"### Model B: ", "", model_name1)
generated_image0, generated_image1 = gen_func(source_text, target_text, instruct_text, source_image, model_name0, model_name1)
state0.source_prompt = source_text
state0.target_prompt = target_text
state0.instruct_prompt = instruct_text
state0.source_image = source_image
state0.output = generated_image0
state0.model_name = model_name0
state1.source_prompt = source_text
state1.target_prompt = target_text
state1.instruct_prompt = instruct_text
state1.source_image = source_image
state1.output = generated_image1
state1.model_name = model_name1
yield state0, state1, generated_image0, generated_image1
finish_tstamp = time.time()
# logger.info(f"===output===: {output}")
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name0,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state0.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name1,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state1.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
for i, state in enumerate([state0, state1]):
src_img_file = f'{IMAGE_DIR}/edition/{state.conv_id}_src.jpg'
os.makedirs(os.path.dirname(src_img_file), exist_ok=True)
with open(src_img_file, 'w') as f:
state.source_image.save(f, 'JPEG')
output_file = f'{IMAGE_DIR}/edition/{state.conv_id}_out.jpg'
with open(output_file, 'w') as f:
state.output.save(f, 'JPEG')
save_image_file_on_log_server(src_img_file)
save_image_file_on_log_server(output_file)
def generate_iem_annoy(gen_func, state0, state1, source_text, target_text, instruct_text, source_image, model_name0, model_name1, request: gr.Request):
if not source_text:
raise gr.Warning("Source prompt cannot be empty.")
if not target_text:
raise gr.Warning("Target prompt cannot be empty.")
if not instruct_text:
raise gr.Warning("Instruction prompt cannot be empty.")
if not source_image:
raise gr.Warning("Source image cannot be empty.")
if state0 is None:
state0 = ImageStateIE(model_name0)
if state1 is None:
state1 = ImageStateIE(model_name1)
ip = get_ip(request)
igm_logger.info(f"generate. ip: {ip}")
start_tstamp = time.time()
model_name0 = re.sub(r"### Model A: ", "", model_name0)
model_name1 = re.sub(r"### Model B: ", "", model_name1)
generated_image0, generated_image1, model_name0, model_name1 = gen_func(source_text, target_text, instruct_text, source_image, model_name0, model_name1)
state0.source_prompt = source_text
state0.target_prompt = target_text
state0.instruct_prompt = instruct_text
state0.source_image = source_image
state0.output = generated_image0
state0.model_name = model_name0
state1.source_prompt = source_text
state1.target_prompt = target_text
state1.instruct_prompt = instruct_text
state1.source_image = source_image
state1.output = generated_image1
state1.model_name = model_name1
yield state0, state1, generated_image0, generated_image1, \
gr.Markdown(f"### Model A: {model_name0}"), gr.Markdown(f"### Model B: {model_name1}")
finish_tstamp = time.time()
# logger.info(f"===output===: {output}")
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name0,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state0.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name1,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state1.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
for i, state in enumerate([state0, state1]):
src_img_file = f'{IMAGE_DIR}/edition/{state.conv_id}_src.jpg'
os.makedirs(os.path.dirname(src_img_file), exist_ok=True)
with open(src_img_file, 'w') as f:
state.source_image.save(f, 'JPEG')
output_file = f'{IMAGE_DIR}/edition/{state.conv_id}_out.jpg'
with open(output_file, 'w') as f:
state.output.save(f, 'JPEG')
save_image_file_on_log_server(src_img_file)
save_image_file_on_log_server(output_file)
def generate_vg(gen_func, state, text, model_name, request: gr.Request):
if not text:
raise gr.Warning("Prompt cannot be empty.")
if not model_name:
raise gr.Warning("Model name cannot be empty.")
if state is None:
state = VideoStateVG(model_name)
ip = get_ip(request)
vg_logger.info(f"generate. ip: {ip}")
start_tstamp = time.time()
generated_video = gen_func(text, model_name)
state.prompt = text
state.output = generated_video
state.model_name = model_name
# yield state, generated_video
finish_tstamp = time.time()
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
output_file = f'{VIDEO_DIR}/generation/{state.conv_id}.mp4'
os.makedirs(os.path.dirname(output_file), exist_ok=True)
if model_name.startswith('fal'):
r = requests.get(state.output)
with open(output_file, 'wb') as outfile:
outfile.write(r.content)
else:
print("======== video shape: ========")
print(state.output.shape)
imageio.mimwrite(output_file, state.output, fps=8, quality=9)
save_video_file_on_log_server(output_file)
yield state, output_file
def generate_vgm(gen_func, state0, state1, text, model_name0, model_name1, request: gr.Request):
if not text:
raise gr.Warning("Prompt cannot be empty.")
if not model_name0:
raise gr.Warning("Model name A cannot be empty.")
if not model_name1:
raise gr.Warning("Model name B cannot be empty.")
if state0 is None:
state0 = VideoStateVG(model_name0)
if state1 is None:
state1 = VideoStateVG(model_name1)
ip = get_ip(request)
igm_logger.info(f"generate. ip: {ip}")
start_tstamp = time.time()
# Remove ### Model (A|B): from model name
model_name0 = re.sub(r"### Model A: ", "", model_name0)
model_name1 = re.sub(r"### Model B: ", "", model_name1)
generated_video0, generated_video1 = gen_func(text, model_name0, model_name1)
state0.prompt = text
state1.prompt = text
state0.output = generated_video0
state1.output = generated_video1
state0.model_name = model_name0
state1.model_name = model_name1
print("====== model name =========")
print(state0.model_name)
print(state1.model_name)
finish_tstamp = time.time()
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name0,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state0.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name1,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state1.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
for i, state in enumerate([state0, state1]):
output_file = f'{VIDEO_DIR}/generation/{state.conv_id}.mp4'
os.makedirs(os.path.dirname(output_file), exist_ok=True)
print(state.model_name)
if state.model_name.startswith('fal'):
r = requests.get(state.output)
with open(output_file, 'wb') as outfile:
outfile.write(r.content)
else:
print("======== video shape: ========")
print(state.output)
print(state.output.shape)
imageio.mimwrite(output_file, state.output, fps=8, quality=9)
save_video_file_on_log_server(output_file)
yield state0, state1, f'{VIDEO_DIR}/generation/{state0.conv_id}.mp4', f'{VIDEO_DIR}/generation/{state1.conv_id}.mp4'
def generate_vgm_annoy(gen_func, state0, state1, text, model_name0, model_name1, request: gr.Request):
if not text:
raise gr.Warning("Prompt cannot be empty.")
if state0 is None:
state0 = VideoStateVG(model_name0)
if state1 is None:
state1 = VideoStateVG(model_name1)
ip = get_ip(request)
vgm_logger.info(f"generate. ip: {ip}")
start_tstamp = time.time()
model_name0 = re.sub(r"### Model A: ", "", model_name0)
model_name1 = re.sub(r"### Model B: ", "", model_name1)
generated_video0, generated_video1, model_name0, model_name1 = gen_func(text, model_name0, model_name1)
state0.prompt = text
state1.prompt = text
state0.output = generated_video0
state1.output = generated_video1
state0.model_name = model_name0
state1.model_name = model_name1
# yield state0, state1, generated_video0, generated_video1, \
# gr.Markdown(f"### Model A: {model_name0}"), gr.Markdown(f"### Model B: {model_name1}")
finish_tstamp = time.time()
# logger.info(f"===output===: {output}")
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name0,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state0.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name1,
"gen_params": {},
"start": round(start_tstamp, 4),
"finish": round(finish_tstamp, 4),
"state": state1.dict(),
"ip": get_ip(request),
}
fout.write(json.dumps(data) + "\n")
append_json_item_on_log_server(data, get_conv_log_filename())
for i, state in enumerate([state0, state1]):
output_file = f'{VIDEO_DIR}/generation/{state.conv_id}.mp4'
os.makedirs(os.path.dirname(output_file), exist_ok=True)
if state.model_name.startswith('fal'):
r = requests.get(state.output)
with open(output_file, 'wb') as outfile:
outfile.write(r.content)
else:
print("======== video shape: ========")
print(state.output.shape)
imageio.mimwrite(output_file, state.output, fps=8, quality=9)
save_video_file_on_log_server(output_file)
yield state0, state1, f'{VIDEO_DIR}/generation/{state0.conv_id}.mp4', f'{VIDEO_DIR}/generation/{state1.conv_id}.mp4', \
gr.Markdown(f"### Model A: {model_name0}"), gr.Markdown(f"### Model B: {model_name1}")