from modelscope import AutoModelForCausalLM, AutoTokenizer, GenerationConfig, snapshot_download from argparse import ArgumentParser from pathlib import Path import shutil import copy import gradio as gr import os import re import secrets import tempfile #GlobalVariables os.environ['CUDA_VISIBLE_DEVICES'] = '0,1' DEFAULT_CKPT_PATH = 'qwen/Qwen-VL-Chat' REVISION = 'v1.0.4' BOX_TAG_PATTERN = r"([\s\S]*?)" PUNCTUATION = "ï¼ï¼Ÿã€‚ï¼‚ï¼ƒï¼„ï¼…ï¼†ï¼‡ï¼ˆï¼‰ï¼Šï¼‹ï¼Œï¼ï¼ï¼šï¼›ï¼œï¼ï¼žï¼ ï¼»ï¼¼ï¼½ï¼¾ï¼¿ï½€ï½›ï½œï½ï½žï½Ÿï½ ï½¢ï½£ï½¤ã€ã€ƒã€‹ã€Œã€ã€Žã€ã€ã€‘ã€”ã€•ã€–ã€—ã€˜ã€™ã€šã€›ã€œã€ã€žã€Ÿã€°ã€¾ã€¿â€“â€”â€˜â€™â€›â€œâ€â€žâ€Ÿâ€¦â€§ï¹." uploaded_file_dir = os.environ.get("GRADIO_TEMP_DIR") or str(Path(tempfile.gettempdir()) / "gradio") tokenizer = None model = None def _get_args() -> ArgumentParser: parser = ArgumentParser() parser.add_argument("-c", "--checkpoint-path", type=str, default=DEFAULT_CKPT_PATH, help="Checkpoint name or path, default to %(default)r") parser.add_argument("--revision", type=str, default=REVISION) parser.add_argument("--cpu-only", action="store_true", help="Run demo with CPU only") parser.add_argument("--share", action="store_true", default=False, help="Create a publicly shareable link for the interface.") parser.add_argument("--inbrowser", action="store_true", default=False, help="Automatically launch the interface in a new tab on the default browser.") parser.add_argument("--server-port", type=int, default=8000, help="Demo server port.") parser.add_argument("--server-name", type=str, default="127.0.0.1", help="Demo server name.") args = parser.parse_args() return args def handle_image_submission(_chatbot, task_history, file) -> tuple: print("handle_image_submission called") if file is None: print("No file uploaded") return _chatbot, task_history print("File received:", file) file_path = save_image(file, uploaded_file_dir) print("File saved at:", file_path) history_item = ((file_path,), None) _chatbot.append(history_item) task_history.append(history_item) return predict(_chatbot, task_history, tokenizer, model) def _load_model_tokenizer(args) -> tuple: global tokenizer, model model_id = args.checkpoint_path model_dir = snapshot_download(model_id, revision=args.revision) tokenizer = AutoTokenizer.from_pretrained( model_dir, trust_remote_code=True, resume_download=True, ) if args.cpu_only: device_map = "cpu" else: device_map = "auto" model = AutoModelForCausalLM.from_pretrained( model_dir, device_map=device_map, trust_remote_code=True, bf16=True, resume_download=True, ).eval() model.generation_config = GenerationConfig.from_pretrained( model_dir, trust_remote_code=True, resume_download=True, ) return model, tokenizer def _parse_text(text: str) -> str: lines = text.split("\n") lines = [line for line in lines if line != ""] count = 0 for i, line in enumerate(lines): if "```" in line: count += 1 items = line.split("`") if count % 2 == 1: lines[i] = f'
'
            else:
                lines[i] = f"
" else: if i > 0: if count % 2 == 1: line = line.replace("`", r"\`") line = line.replace("<", "<") line = line.replace(">", ">") line = line.replace(" ", " ") line = line.replace("*", "*") line = line.replace("_", "_") line = line.replace("-", "-") line = line.replace(".", ".") line = line.replace("!", "!") line = line.replace("(", "(") line = line.replace(")", ")") line = line.replace("$", "$") lines[i] = "
" + line text = "".join(lines) return text def save_image(image_file, upload_dir: str) -> str: print("save_image called with:", image_file) Path(upload_dir).mkdir(parents=True, exist_ok=True) filename = secrets.token_hex(10) + Path(image_file.name).suffix file_path = Path(upload_dir) / filename print("Saving to:", file_path) with open(image_file.name, "rb") as f_input, open(file_path, "wb") as f_output: f_output.write(f_input.read()) return str(file_path) def add_file(history, task_history, file): if file is None: return history, task_history file_path = save_image(file) history = history + [((file_path,), None)] task_history = task_history + [((file_path,), None)] return history, task_history def predict(_chatbot, task_history) -> list: print("predict called") if not _chatbot: return _chatbot chat_query = _chatbot[-1][0] print("Chat query:", chat_query) if isinstance(chat_query, tuple): query = [{'image': chat_query[0]}] else: query = [{'text': _parse_text(chat_query)}] print("Query for model:", query) inputs = tokenizer.from_list_format(query) tokenized_inputs = tokenizer(inputs, return_tensors='pt') tokenized_inputs = tokenized_inputs.to(model.device) pred = model.generate(**tokenized_inputs) response = tokenizer.decode(pred.cpu()[0], skip_special_tokens=False) print("Model response:", response) if 'image' in query[0]: image = tokenizer.draw_bbox_on_latest_picture(response) if image is not None: image_path = save_image(image, uploaded_file_dir) _chatbot[-1] = (chat_query, (image_path,)) else: _chatbot[-1] = (chat_query, "No image to display.") else: _chatbot[-1] = (chat_query, response) return _chatbot def save_uploaded_image(image_file, upload_dir): if image is None: return None temp_dir = secrets.token_hex(20) temp_dir = Path(uploaded_file_dir) / temp_dir temp_dir.mkdir(exist_ok=True, parents=True) name = f"tmp{secrets.token_hex(5)}.jpg" filename = temp_dir / name image.save(str(filename)) return str(filename) def regenerate(_chatbot, task_history) -> list: if not task_history: return _chatbot item = task_history[-1] if item[1] is None: return _chatbot task_history[-1] = (item[0], None) chatbot_item = _chatbot.pop(-1) if chatbot_item[0] is None: _chatbot[-1] = (_chatbot[-1][0], None) else: _chatbot.append((chatbot_item[0], None)) return predict(_chatbot, task_history, tokenizer, model) def add_text(history, task_history, text) -> tuple: task_text = text if len(text) >= 2 and text[-1] in PUNCTUATION and text[-2] not in PUNCTUATION: task_text = text[:-1] history = history + [(_parse_text(text), None)] task_history = task_history + [(task_text, None)] return history, task_history, "" def add_file(history, task_history, file): if file is None: return history, task_history # Return if no file is uploaded file_path = file.name history = history + [((file.name,), None)] task_history = task_history + [((file.name,), None)] return history, task_history def reset_user_input(): return gr.update(value="") def process_response(response: str) -> str: response = response.replace("", "").replace(r"", "") response = re.sub(BOX_TAG_PATTERN, "", response) return response def process_history_for_model(task_history) -> list: processed_history = [] for query, response in task_history: if isinstance(query, tuple): query = {'image': query[0]} else: query = {'text': query} response = response or "" processed_history.append((query, response)) return processed_history def reset_state(task_history) -> list: task_history.clear() return [] def _launch_demo(args, model, tokenizer): uploaded_file_dir = os.environ.get("GRADIO_TEMP_DIR") or str( Path(tempfile.gettempdir()) / "gradio" ) with gr.Blocks() as demo: gr.Markdown(""" # 🙋🏻‍♂️欢迎来到🌟Tonic 的🦄Qwen-VL-Chat🤩Bot!🚀 # 🙋🏻‍♂️Welcome toTonic's Qwen-VL-Chat Bot! 该WebUI基于Qwen-VL-Chat,实现聊天机器人功能。 但我必须解决它的很多问题,也许我也能获得一些荣誉。 Qwen-VL-Chat 是一种多模式输入模型。 您可以使用此空间来测试当前模型 [qwen/Qwen-VL-Chat](https://huggingface.co/qwen/Qwen-VL-Chat) 您也可以使用 🧑🏻‍🚀qwen/Qwen-VL -通过克隆这个空间来聊天🚀。 🧬🔬🔍 只需点击这里:[重复空间](https://huggingface.co/spaces/Tonic1/VLChat?duplicate=true) 加入我们:🌟TeamTonic🌟总是在制作很酷的演示! 在 👻Discord 上加入我们活跃的构建者🛠️社区:[Discord](https://discord.gg/nXx5wbX9) 在 🤗Huggingface 上:[TeamTonic](https://huggingface.co/TeamTonic) 和 [MultiTransformer](https:/ /huggingface.co/MultiTransformer) 在 🌐Github 上:[Polytonic](https://github.com/tonic-ai) 并为 🌟 [PolyGPT](https://github.com/tonic-ai/polygpt-alpha) 做出贡献 ) This WebUI is based on Qwen-VL-Chat, implementing chatbot functionalities. Qwen-VL-Chat is a multimodal input model. You can use this Space to test out the current model [qwen/Qwen-VL-Chat](https://huggingface.co/qwen/Qwen-VL-Chat) You can also use qwen/Qwen-VL-Chat🚀 by cloning this space. Simply click here: [Duplicate Space](https://huggingface.co/spaces/Tonic1/VLChat?duplicate=true) Join us: TeamTonic is always making cool demos! Join our active builder's community on Discord: [Discord](https://discord.gg/nXx5wbX9) On Huggingface: [TeamTonic](https://huggingface.co/TeamTonic) & [MultiTransformer](https://huggingface.co/MultiTransformer) On Github: [Polytonic](https://github.com/tonic-ai) & contribute to [PolyGPT](https://github.com/tonic-ai/polygpt-alpha) """) with gr.Row(): with gr.Column(scale=1): chatbot = gr.Chatbot(label='Qwen-VL-Chat') with gr.Column(scale=1): with gr.Row(): query = gr.Textbox(lines=2, label='Input', placeholder="Type your message here...") submit_btn = gr.Button("🚀 Submit") with gr.Row(): file_upload = gr.UploadButton("📁 Upload Image", file_types=["image"]) submit_file_btn = gr.Button("Submit Image") regen_btn = gr.Button("🤔️ Regenerate") empty_bin = gr.Button("🧹 Clear History") task_history = gr.State([]) submit_btn.click( fn=predict, inputs=[chatbot, task_history], outputs=[chatbot] ) submit_file_btn.click( fn=handle_image_submission, inputs=[chatbot, task_history, file_upload], outputs=[chatbot, task_history] ) regen_btn.click( fn=regenerate, inputs=[chatbot, task_history], outputs=[chatbot] ) empty_bin.click( fn=reset_state, inputs=[task_history], outputs=[task_history], ) query.submit( fn=add_text, inputs=[chatbot, task_history, query], outputs=[chatbot, task_history, query] ) gr.Markdown(""" 注意:此演示受 Qwen-VL 原始许可证的约束。我们强烈建议用户不要故意生成或允许他人故意生成有害内容, 包括仇恨言论、暴力、色情、欺骗等。(注:本演示受Qwen-VL许可协议约束,强烈建议用户不要传播或允许他人传播以下内容,包括但不限于仇恨言论、暴力、色情、欺诈相关的有害信息 .) Note: This demo is governed by the original license of Qwen-VL. We strongly advise users not to knowingly generate or allow others to knowingly generate harmful content, including hate speech, violence, pornography, deception, etc. (Note: This demo is subject to the license agreement of Qwen-VL. We strongly advise users not to disseminate or allow others to disseminate the following content, including but not limited to hate speech, violence, pornography, and fraud-related harmful information.) """) demo.queue().launch() def main(): args = _get_args() model, tokenizer = _load_model_tokenizer(args) _launch_demo(args, model, tokenizer) if __name__ == '__main__': main()