from modelscope import AutoModelForCausalLM, AutoTokenizer, GenerationConfig, snapshot_download
from argparse import ArgumentParser
from pathlib import Path
import shutil
import copy
import gradio as gr
import os
import re
import secrets
import tempfile
#GlobalVariables
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1'
DEFAULT_CKPT_PATH = 'qwen/Qwen-VL-Chat'
REVISION = 'v1.0.4'
BOX_TAG_PATTERN = r"([\s\S]*?)"
PUNCTUATION = "ï¼Â?。"#$%&'()*+,ï¼Âï¼Â:;<ï¼Â>ï¼ [\]^_`{|ï½Â~⦅ï½ 「」、ã€Â〃》「ã€Â『ã€Âã€Â】ã€â€Ã£â‚¬â€¢Ã£â‚¬â€“〗〘〙〚〛〜ã€Â〞〟〰〾〿–â€â€Ã¢â‚¬ËœÃ¢â‚¬â„¢Ã¢â‚¬â€ºÃ¢â‚¬Å“â€Â„‟…‧ï¹Â."
uploaded_file_dir = os.environ.get("GRADIO_TEMP_DIR") or str(Path(tempfile.gettempdir()) / "gradio")
tokenizer = None
model = None
def _get_args() -> ArgumentParser:
parser = ArgumentParser()
parser.add_argument("-c", "--checkpoint-path", type=str, default=DEFAULT_CKPT_PATH,
help="Checkpoint name or path, default to %(default)r")
parser.add_argument("--revision", type=str, default=REVISION)
parser.add_argument("--cpu-only", action="store_true", help="Run demo with CPU only")
parser.add_argument("--share", action="store_true", default=False,
help="Create a publicly shareable link for the interface.")
parser.add_argument("--inbrowser", action="store_true", default=False,
help="Automatically launch the interface in a new tab on the default browser.")
parser.add_argument("--server-port", type=int, default=8000,
help="Demo server port.")
parser.add_argument("--server-name", type=str, default="127.0.0.1",
help="Demo server name.")
args = parser.parse_args()
return args
def handle_image_submission(_chatbot, task_history, file) -> tuple:
print("handle_image_submission called")
if file is None:
print("No file uploaded")
return _chatbot, task_history
print("File received:", file)
file_path = save_image(file, uploaded_file_dir)
print("File saved at:", file_path)
history_item = ((file_path,), None)
_chatbot.append(history_item)
task_history.append(history_item)
return predict(_chatbot, task_history, tokenizer, model)
def _load_model_tokenizer(args) -> tuple:
global tokenizer, model
model_id = args.checkpoint_path
model_dir = snapshot_download(model_id, revision=args.revision)
tokenizer = AutoTokenizer.from_pretrained(
model_dir, trust_remote_code=True, resume_download=True,
)
if args.cpu_only:
device_map = "cpu"
else:
device_map = "auto"
model = AutoModelForCausalLM.from_pretrained(
model_dir,
device_map=device_map,
trust_remote_code=True,
bf16=True,
resume_download=True,
).eval()
model.generation_config = GenerationConfig.from_pretrained(
model_dir, trust_remote_code=True, resume_download=True,
)
return model, tokenizer
def _parse_text(text: str) -> str:
lines = text.split("\n")
lines = [line for line in lines if line != ""]
count = 0
for i, line in enumerate(lines):
if "```" in line:
count += 1
items = line.split("`")
if count % 2 == 1:
lines[i] = f'
'
else:
lines[i] = f"
"
else:
if i > 0:
if count % 2 == 1:
line = line.replace("`", r"\`")
line = line.replace("<", "<")
line = line.replace(">", ">")
line = line.replace(" ", " ")
line = line.replace("*", "*")
line = line.replace("_", "_")
line = line.replace("-", "-")
line = line.replace(".", ".")
line = line.replace("!", "!")
line = line.replace("(", "(")
line = line.replace(")", ")")
line = line.replace("$", "$")
lines[i] = "
" + line
text = "".join(lines)
return text
def save_image(image_file, upload_dir: str) -> str:
print("save_image called with:", image_file)
Path(upload_dir).mkdir(parents=True, exist_ok=True)
filename = secrets.token_hex(10) + Path(image_file.name).suffix
file_path = Path(upload_dir) / filename
print("Saving to:", file_path)
with open(image_file.name, "rb") as f_input, open(file_path, "wb") as f_output:
f_output.write(f_input.read())
return str(file_path)
def add_file(history, task_history, file):
if file is None:
return history, task_history
file_path = save_image(file)
history = history + [((file_path,), None)]
task_history = task_history + [((file_path,), None)]
return history, task_history
def predict(_chatbot, task_history) -> list:
print("predict called")
if not _chatbot:
return _chatbot
chat_query = _chatbot[-1][0]
print("Chat query:", chat_query)
if isinstance(chat_query, tuple):
query = [{'image': chat_query[0]}]
else:
query = [{'text': _parse_text(chat_query)}]
print("Query for model:", query)
inputs = tokenizer.from_list_format(query)
tokenized_inputs = tokenizer(inputs, return_tensors='pt')
tokenized_inputs = tokenized_inputs.to(model.device)
pred = model.generate(**tokenized_inputs)
response = tokenizer.decode(pred.cpu()[0], skip_special_tokens=False)
print("Model response:", response)
if 'image' in query[0]:
image = tokenizer.draw_bbox_on_latest_picture(response)
if image is not None:
image_path = save_image(image, uploaded_file_dir)
_chatbot[-1] = (chat_query, (image_path,))
else:
_chatbot[-1] = (chat_query, "No image to display.")
else:
_chatbot[-1] = (chat_query, response)
return _chatbot
def save_uploaded_image(image_file, upload_dir):
if image is None:
return None
temp_dir = secrets.token_hex(20)
temp_dir = Path(uploaded_file_dir) / temp_dir
temp_dir.mkdir(exist_ok=True, parents=True)
name = f"tmp{secrets.token_hex(5)}.jpg"
filename = temp_dir / name
image.save(str(filename))
return str(filename)
def regenerate(_chatbot, task_history) -> list:
if not task_history:
return _chatbot
item = task_history[-1]
if item[1] is None:
return _chatbot
task_history[-1] = (item[0], None)
chatbot_item = _chatbot.pop(-1)
if chatbot_item[0] is None:
_chatbot[-1] = (_chatbot[-1][0], None)
else:
_chatbot.append((chatbot_item[0], None))
return predict(_chatbot, task_history, tokenizer, model)
def add_text(history, task_history, text) -> tuple:
task_text = text
if len(text) >= 2 and text[-1] in PUNCTUATION and text[-2] not in PUNCTUATION:
task_text = text[:-1]
history = history + [(_parse_text(text), None)]
task_history = task_history + [(task_text, None)]
return history, task_history, ""
def add_file(history, task_history, file):
if file is None:
return history, task_history # Return if no file is uploaded
file_path = file.name
history = history + [((file.name,), None)]
task_history = task_history + [((file.name,), None)]
return history, task_history
def reset_user_input():
return gr.update(value="")
def process_response(response: str) -> str:
response = response.replace("[", "").replace(r"]", "")
response = re.sub(BOX_TAG_PATTERN, "", response)
return response
def process_history_for_model(task_history) -> list:
processed_history = []
for query, response in task_history:
if isinstance(query, tuple):
query = {'image': query[0]}
else:
query = {'text': query}
response = response or ""
processed_history.append((query, response))
return processed_history
def reset_state(task_history) -> list:
task_history.clear()
return []
def _launch_demo(args, model, tokenizer):
uploaded_file_dir = os.environ.get("GRADIO_TEMP_DIR") or str(
Path(tempfile.gettempdir()) / "gradio"
)
with gr.Blocks() as demo:
gr.Markdown("""
# 🙋🏻♂️欢迎来到🌟Tonic 的🦄Qwen-VL-Chat🤩Bot!🚀
# 🙋🏻♂️Welcome toTonic's Qwen-VL-Chat Bot!
该WebUI基于Qwen-VL-Chat,实现聊天机器人功能。 但我必须解决它的很多问题,也许我也能获得一些荣誉。
Qwen-VL-Chat 是一种多模式输入模型。 您可以使用此空间来测试当前模型 [qwen/Qwen-VL-Chat](https://huggingface.co/qwen/Qwen-VL-Chat) 您也可以使用 🧑🏻🚀qwen/Qwen-VL -通过克隆这个空间来聊天🚀。 🧬🔬🔍 只需点击这里:[重复空间](https://huggingface.co/spaces/Tonic1/VLChat?duplicate=true)
加入我们:🌟TeamTonic🌟总是在制作很酷的演示! 在 👻Discord 上加入我们活跃的构建者🛠️社区:[Discord](https://discord.gg/nXx5wbX9) 在 🤗Huggingface 上:[TeamTonic](https://huggingface.co/TeamTonic) 和 [MultiTransformer](https:/ /huggingface.co/MultiTransformer) 在 🌐Github 上:[Polytonic](https://github.com/tonic-ai) 并为 🌟 [PolyGPT](https://github.com/tonic-ai/polygpt-alpha) 做出贡献 )
This WebUI is based on Qwen-VL-Chat, implementing chatbot functionalities. Qwen-VL-Chat is a multimodal input model. You can use this Space to test out the current model [qwen/Qwen-VL-Chat](https://huggingface.co/qwen/Qwen-VL-Chat) You can also use qwen/Qwen-VL-Chat🚀 by cloning this space. Simply click here: [Duplicate Space](https://huggingface.co/spaces/Tonic1/VLChat?duplicate=true)
Join us: TeamTonic is always making cool demos! Join our active builder's community on Discord: [Discord](https://discord.gg/nXx5wbX9) On Huggingface: [TeamTonic](https://huggingface.co/TeamTonic) & [MultiTransformer](https://huggingface.co/MultiTransformer) On Github: [Polytonic](https://github.com/tonic-ai) & contribute to [PolyGPT](https://github.com/tonic-ai/polygpt-alpha)
""")
with gr.Row():
with gr.Column(scale=1):
chatbot = gr.Chatbot(label='Qwen-VL-Chat')
with gr.Column(scale=1):
with gr.Row():
query = gr.Textbox(lines=2, label='Input', placeholder="Type your message here...")
submit_btn = gr.Button("🚀 Submit")
with gr.Row():
file_upload = gr.UploadButton("📠Upload Image", file_types=["image"])
submit_file_btn = gr.Button("Submit Image")
regen_btn = gr.Button("ðŸ¤â€Ã¯Â¸Â Regenerate")
empty_bin = gr.Button("🧹 Clear History")
task_history = gr.State([])
submit_btn.click(
fn=predict,
inputs=[chatbot, task_history],
outputs=[chatbot]
)
submit_file_btn.click(
fn=handle_image_submission,
inputs=[chatbot, task_history, file_upload],
outputs=[chatbot, task_history]
)
regen_btn.click(
fn=regenerate,
inputs=[chatbot, task_history],
outputs=[chatbot]
)
empty_bin.click(
fn=reset_state,
inputs=[task_history],
outputs=[task_history],
)
query.submit(
fn=add_text,
inputs=[chatbot, task_history, query],
outputs=[chatbot, task_history, query]
)
gr.Markdown("""
注意:此演示受 Qwen-VL 原始许可证的约束。我们强烈建议用户不要故意生成或允许他人故意生成有害内容,
包括仇恨言论、暴力、色情、欺骗等。(注:本演示受Qwen-VL许可协议约束,强烈建议用户不要传播或允许他人传播以下内容,包括但不限于仇恨言论、暴力、色情、欺诈相关的有害信息 .)
Note: This demo is governed by the original license of Qwen-VL. We strongly advise users not to knowingly generate or allow others to knowingly generate harmful content,
including hate speech, violence, pornography, deception, etc. (Note: This demo is subject to the license agreement of Qwen-VL. We strongly advise users not to disseminate or allow others to disseminate the following content, including but not limited to hate speech, violence, pornography, and fraud-related harmful information.)
""")
demo.queue().launch()
def main():
args = _get_args()
model, tokenizer = _load_model_tokenizer(args)
_launch_demo(args, model, tokenizer)
if __name__ == '__main__':
main()