|
|
|
from modelscope import AutoModelForCausalLM, AutoTokenizer, GenerationConfig, snapshot_download |
|
from argparse import ArgumentParser |
|
from pathlib import Path |
|
import shutil |
|
import copy |
|
import gradio as gr |
|
import os |
|
import re |
|
import secrets |
|
import tempfile |
|
|
|
|
|
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1' |
|
DEFAULT_CKPT_PATH = 'qwen/Qwen-VL-Chat' |
|
REVISION = 'v1.0.4' |
|
BOX_TAG_PATTERN = r"<box>([\s\S]*?)</box>" |
|
PUNCTUATION = "ï¼Â?。"#$%&'()*+,ï¼Âï¼Â:;<ï¼Â>ï¼ [\]^_`{|ï½Â~⦅ï½ 「」、ã€Â〃》「ã€Â『ã€Âã€Â】ã€â€Ã£â‚¬â€¢Ã£â‚¬â€“〗〘〙〚〛〜ã€Â〞〟〰〾〿–â€â€Ã¢â‚¬ËœÃ¢â‚¬â„¢Ã¢â‚¬â€ºÃ¢â‚¬Å“â€Â„‟…‧ï¹Â." |
|
uploaded_file_dir = os.environ.get("GRADIO_TEMP_DIR") or str(Path(tempfile.gettempdir()) / "gradio") |
|
tokenizer = None |
|
model = None |
|
|
|
def _get_args() -> ArgumentParser: |
|
parser = ArgumentParser() |
|
parser.add_argument("-c", "--checkpoint-path", type=str, default=DEFAULT_CKPT_PATH, |
|
help="Checkpoint name or path, default to %(default)r") |
|
parser.add_argument("--revision", type=str, default=REVISION) |
|
parser.add_argument("--cpu-only", action="store_true", help="Run demo with CPU only") |
|
|
|
parser.add_argument("--share", action="store_true", default=False, |
|
help="Create a publicly shareable link for the interface.") |
|
parser.add_argument("--inbrowser", action="store_true", default=False, |
|
help="Automatically launch the interface in a new tab on the default browser.") |
|
parser.add_argument("--server-port", type=int, default=8000, |
|
help="Demo server port.") |
|
parser.add_argument("--server-name", type=str, default="127.0.0.1", |
|
help="Demo server name.") |
|
|
|
args = parser.parse_args() |
|
return args |
|
|
|
def handle_image_submission(_chatbot, task_history, file) -> tuple: |
|
print("handle_image_submission called") |
|
if file is None: |
|
print("No file uploaded") |
|
return _chatbot, task_history |
|
print("File received:", file) |
|
file_path = save_image(file, uploaded_file_dir) |
|
print("File saved at:", file_path) |
|
history_item = ((file_path,), None) |
|
_chatbot.append(history_item) |
|
task_history.append(history_item) |
|
return predict(_chatbot, task_history, tokenizer, model) |
|
|
|
|
|
def _load_model_tokenizer(args) -> tuple: |
|
global tokenizer, model |
|
model_id = args.checkpoint_path |
|
model_dir = snapshot_download(model_id, revision=args.revision) |
|
tokenizer = AutoTokenizer.from_pretrained( |
|
model_dir, trust_remote_code=True, resume_download=True, |
|
) |
|
|
|
if args.cpu_only: |
|
device_map = "cpu" |
|
else: |
|
device_map = "auto" |
|
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
model_dir, |
|
device_map=device_map, |
|
trust_remote_code=True, |
|
bf16=True, |
|
resume_download=True, |
|
).eval() |
|
model.generation_config = GenerationConfig.from_pretrained( |
|
model_dir, trust_remote_code=True, resume_download=True, |
|
) |
|
|
|
return model, tokenizer |
|
|
|
|
|
def _parse_text(text: str) -> str: |
|
lines = text.split("\n") |
|
lines = [line for line in lines if line != ""] |
|
count = 0 |
|
for i, line in enumerate(lines): |
|
if "```" in line: |
|
count += 1 |
|
items = line.split("`") |
|
if count % 2 == 1: |
|
lines[i] = f'<pre><code class="language-{items[-1]}">' |
|
else: |
|
lines[i] = f"<br></code></pre>" |
|
else: |
|
if i > 0: |
|
if count % 2 == 1: |
|
line = line.replace("`", r"\`") |
|
line = line.replace("<", "<") |
|
line = line.replace(">", ">") |
|
line = line.replace(" ", " ") |
|
line = line.replace("*", "*") |
|
line = line.replace("_", "_") |
|
line = line.replace("-", "-") |
|
line = line.replace(".", ".") |
|
line = line.replace("!", "!") |
|
line = line.replace("(", "(") |
|
line = line.replace(")", ")") |
|
line = line.replace("$", "$") |
|
lines[i] = "<br>" + line |
|
text = "".join(lines) |
|
return text |
|
|
|
def save_image(image_file, upload_dir: str) -> str: |
|
print("save_image called with:", image_file) |
|
Path(upload_dir).mkdir(parents=True, exist_ok=True) |
|
filename = secrets.token_hex(10) + Path(image_file.name).suffix |
|
file_path = Path(upload_dir) / filename |
|
print("Saving to:", file_path) |
|
with open(image_file.name, "rb") as f_input, open(file_path, "wb") as f_output: |
|
f_output.write(f_input.read()) |
|
return str(file_path) |
|
|
|
|
|
def add_file(history, task_history, file): |
|
if file is None: |
|
return history, task_history |
|
file_path = save_image(file) |
|
history = history + [((file_path,), None)] |
|
task_history = task_history + [((file_path,), None)] |
|
return history, task_history |
|
|
|
|
|
def predict(_chatbot, task_history) -> list: |
|
print("predict called") |
|
if not _chatbot: |
|
return _chatbot |
|
chat_query = _chatbot[-1][0] |
|
print("Chat query:", chat_query) |
|
|
|
if isinstance(chat_query, tuple): |
|
query = [{'image': chat_query[0]}] |
|
else: |
|
query = [{'text': _parse_text(chat_query)}] |
|
|
|
print("Query for model:", query) |
|
inputs = tokenizer.from_list_format(query) |
|
tokenized_inputs = tokenizer(inputs, return_tensors='pt') |
|
tokenized_inputs = tokenized_inputs.to(model.device) |
|
|
|
pred = model.generate(**tokenized_inputs) |
|
response = tokenizer.decode(pred.cpu()[0], skip_special_tokens=False) |
|
print("Model response:", response) |
|
if 'image' in query[0]: |
|
image = tokenizer.draw_bbox_on_latest_picture(response) |
|
if image is not None: |
|
image_path = save_image(image, uploaded_file_dir) |
|
_chatbot[-1] = (chat_query, (image_path,)) |
|
else: |
|
_chatbot[-1] = (chat_query, "No image to display.") |
|
else: |
|
_chatbot[-1] = (chat_query, response) |
|
return _chatbot |
|
|
|
def save_uploaded_image(image_file, upload_dir): |
|
if image is None: |
|
return None |
|
temp_dir = secrets.token_hex(20) |
|
temp_dir = Path(uploaded_file_dir) / temp_dir |
|
temp_dir.mkdir(exist_ok=True, parents=True) |
|
name = f"tmp{secrets.token_hex(5)}.jpg" |
|
filename = temp_dir / name |
|
image.save(str(filename)) |
|
return str(filename) |
|
|
|
def regenerate(_chatbot, task_history) -> list: |
|
if not task_history: |
|
return _chatbot |
|
item = task_history[-1] |
|
if item[1] is None: |
|
return _chatbot |
|
task_history[-1] = (item[0], None) |
|
chatbot_item = _chatbot.pop(-1) |
|
if chatbot_item[0] is None: |
|
_chatbot[-1] = (_chatbot[-1][0], None) |
|
else: |
|
_chatbot.append((chatbot_item[0], None)) |
|
return predict(_chatbot, task_history, tokenizer, model) |
|
|
|
def add_text(history, task_history, text) -> tuple: |
|
task_text = text |
|
if len(text) >= 2 and text[-1] in PUNCTUATION and text[-2] not in PUNCTUATION: |
|
task_text = text[:-1] |
|
history = history + [(_parse_text(text), None)] |
|
task_history = task_history + [(task_text, None)] |
|
return history, task_history, "" |
|
|
|
def add_file(history, task_history, file): |
|
if file is None: |
|
return history, task_history |
|
file_path = file.name |
|
history = history + [((file.name,), None)] |
|
task_history = task_history + [((file.name,), None)] |
|
return history, task_history |
|
|
|
def reset_user_input(): |
|
return gr.update(value="") |
|
|
|
def process_response(response: str) -> str: |
|
response = response.replace("<ref>", "").replace(r"</ref>", "") |
|
response = re.sub(BOX_TAG_PATTERN, "", response) |
|
return response |
|
|
|
def process_history_for_model(task_history) -> list: |
|
processed_history = [] |
|
for query, response in task_history: |
|
if isinstance(query, tuple): |
|
query = {'image': query[0]} |
|
else: |
|
query = {'text': query} |
|
response = response or "" |
|
processed_history.append((query, response)) |
|
return processed_history |
|
|
|
def reset_state(task_history) -> list: |
|
task_history.clear() |
|
return [] |
|
|
|
|
|
def _launch_demo(args, model, tokenizer): |
|
uploaded_file_dir = os.environ.get("GRADIO_TEMP_DIR") or str( |
|
Path(tempfile.gettempdir()) / "gradio" |
|
) |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown(""" |
|
# 🙋🏻♂️欢迎来到🌟Tonic 的🦄Qwen-VL-Chat🤩Bot!🚀 |
|
# 🙋🏻♂️Welcome toTonic's Qwen-VL-Chat Bot! |
|
该WebUI基于Qwen-VL-Chat,实现聊天机器人功能。 但我必须解决它的很多问题,也许我也能获得一些荣誉。 |
|
Qwen-VL-Chat 是一种多模式输入模型。 您可以使用此空间来测试当前模型 [qwen/Qwen-VL-Chat](https://huggingface.co/qwen/Qwen-VL-Chat) 您也可以使用 🧑🏻🚀qwen/Qwen-VL -通过克隆这个空间来聊天🚀。 🧬🔬🔍 只需点击这里:[重复空间](https://huggingface.co/spaces/Tonic1/VLChat?duplicate=true) |
|
加入我们:🌟TeamTonic🌟总是在制作很酷的演示! 在 👻Discord 上加入我们活跃的构建者🛠️社区:[Discord](https://discord.gg/nXx5wbX9) 在 🤗Huggingface 上:[TeamTonic](https://huggingface.co/TeamTonic) 和 [MultiTransformer](https:/ /huggingface.co/MultiTransformer) 在 🌐Github 上:[Polytonic](https://github.com/tonic-ai) 并为 🌟 [PolyGPT](https://github.com/tonic-ai/polygpt-alpha) 做出贡献 ) |
|
This WebUI is based on Qwen-VL-Chat, implementing chatbot functionalities. Qwen-VL-Chat is a multimodal input model. You can use this Space to test out the current model [qwen/Qwen-VL-Chat](https://huggingface.co/qwen/Qwen-VL-Chat) You can also use qwen/Qwen-VL-Chat🚀 by cloning this space. Simply click here: [Duplicate Space](https://huggingface.co/spaces/Tonic1/VLChat?duplicate=true) |
|
Join us: TeamTonic is always making cool demos! Join our active builder's community on Discord: [Discord](https://discord.gg/nXx5wbX9) On Huggingface: [TeamTonic](https://huggingface.co/TeamTonic) & [MultiTransformer](https://huggingface.co/MultiTransformer) On Github: [Polytonic](https://github.com/tonic-ai) & contribute to [PolyGPT](https://github.com/tonic-ai/polygpt-alpha) |
|
""") |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
chatbot = gr.Chatbot(label='Qwen-VL-Chat') |
|
with gr.Column(scale=1): |
|
with gr.Row(): |
|
query = gr.Textbox(lines=2, label='Input', placeholder="Type your message here...") |
|
submit_btn = gr.Button("🚀 Submit") |
|
with gr.Row(): |
|
file_upload = gr.UploadButton("📠Upload Image", file_types=["image"]) |
|
submit_file_btn = gr.Button("Submit Image") |
|
regen_btn = gr.Button("ðŸ¤â€Ã¯Â¸Â Regenerate") |
|
empty_bin = gr.Button("🧹 Clear History") |
|
task_history = gr.State([]) |
|
|
|
submit_btn.click( |
|
fn=predict, |
|
inputs=[chatbot, task_history], |
|
outputs=[chatbot] |
|
) |
|
|
|
submit_file_btn.click( |
|
fn=handle_image_submission, |
|
inputs=[chatbot, task_history, file_upload], |
|
outputs=[chatbot, task_history] |
|
) |
|
|
|
regen_btn.click( |
|
fn=regenerate, |
|
inputs=[chatbot, task_history], |
|
outputs=[chatbot] |
|
) |
|
|
|
empty_bin.click( |
|
fn=reset_state, |
|
inputs=[task_history], |
|
outputs=[task_history], |
|
) |
|
|
|
query.submit( |
|
fn=add_text, |
|
inputs=[chatbot, task_history, query], |
|
outputs=[chatbot, task_history, query] |
|
) |
|
|
|
gr.Markdown(""" |
|
注意:此演示受 Qwen-VL 原始许可证的约束。我们强烈建议用户不要故意生成或允许他人故意生成有害内容, |
|
包括仇恨言论、暴力、色情、欺骗等。(注:本演示受Qwen-VL许可协议约束,强烈建议用户不要传播或允许他人传播以下内容,包括但不限于仇恨言论、暴力、色情、欺诈相关的有害信息 .) |
|
Note: This demo is governed by the original license of Qwen-VL. We strongly advise users not to knowingly generate or allow others to knowingly generate harmful content, |
|
including hate speech, violence, pornography, deception, etc. (Note: This demo is subject to the license agreement of Qwen-VL. We strongly advise users not to disseminate or allow others to disseminate the following content, including but not limited to hate speech, violence, pornography, and fraud-related harmful information.) |
|
""") |
|
|
|
demo.queue().launch() |
|
|
|
|
|
def main(): |
|
args = _get_args() |
|
model, tokenizer = _load_model_tokenizer(args) |
|
_launch_demo(args, model, tokenizer) |
|
|
|
if __name__ == '__main__': |
|
main() |