multiprocess_inference长时间运行报错SystemError: null argument to internal routine

#4
by waterpeople - opened

当我把程序的prompt用describe this with a word,image就用test.jpg重复运行,每次运行一段时间都会出现SystemError: null argument to internal routine。
并且我测试
handle:: c_void_p(553595072)
rkllm_input:: <rkllm_binding.RKLLMInput object at 0xffff64014340>
infer_param:: <rkllm_binding.RKLLMInferParam object at 0xffff641bdac0>
这几个参数都不为空。这是为什么呢?

import os
import time
import signal
from multiprocessing import Process, Queue, Event
import cv2
import numpy as np
from rkllm_binding import *
from rknnlite.api.rknn_lite import RKNNLite
from icecream import ic
import logging

logging.basicConfig(
filename="program.log",
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)

视觉编码器进程

def vision_encoder_process(load_ready_queue, embedding_queue, img_path_queue, start_event):

VISION_ENCODER_PATH = "/home/orangepi/Desktop/MiniCPM-V-2_6-rkllm-sex-student/vision_transformer.rknn"
img_size = 448

# 初始化视觉编码器
vision_encoder = RKNNLite(verbose=False)
model_size = os.path.getsize(VISION_ENCODER_PATH)
print(f"Start loading vision encoder model (size: {model_size / 1024 / 1024:.2f} MB)")
start_time = time.time()
vision_encoder.load_rknn(VISION_ENCODER_PATH)
end_time = time.time()
print(f"Vision encoder loaded in {end_time - start_time:.2f} seconds")
vision_encoder.init_runtime(core_mask=RKNNLite.NPU_CORE_0_1_2)

# 通知主进程加载完成
load_ready_queue.put("vision_ready")

# 等待开始信号
start_event.wait()

def process_image(img_path, vision_encoder):
    img = cv2.imread(img_path)
    if img is None:
        return None
    print("Start vision inference...")
    img = cv2.resize(img, (img_size, img_size))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = img.astype(np.float32)
    img = img[np.newaxis, :, :, :]
    
    start_time = time.time()
    image_embeddings = vision_encoder.inference(inputs=[img], data_format="nhwc")[0].astype(np.float32)
    end_time = time.time()
    print(f"Vision encoder inference time: {end_time - start_time:.2f} seconds")
    return image_embeddings

while True:
    img_path = img_path_queue.get()
    # logging.debug("Image path dequeued: %s", img_path)
    if img_path == "STOP":
        break
    embeddings = process_image(img_path, vision_encoder)
    if embeddings is not None:
        embedding_queue.put(embeddings)
        logging.debug("vision_encoder::Embedding enqueued successfully.")
    else:
        embedding_queue.put("ERROR")
        logging.error("Embedding processing failed.")

LLM进程

def llm_process(load_ready_queue, embedding_queue, prompt_queue, inference_done_queue, start_event):

MODEL_PATH = "/home/orangepi/Desktop/MiniCPM-V-2_6-rkllm-sex-student/qwen.rkllm"
handle = None

def signal_handler(signal, frame):
    print("Ctrl-C pressed, exiting...")
    global handle
    if handle:
        abort(handle)
        destroy(handle)
    exit(0)

signal.signal(signal.SIGINT, signal_handler)
os.environ["RKLLM_LOG_LEVEL"] = "0"

inference_count = 0
inference_start_time = 0
def result_callback(result, userdata, state):
    nonlocal inference_start_time, inference_count
    if state == LLMCallState.RKLLM_RUN_NORMAL:
        if inference_count == 0:
            first_token_time = time.time()
            # print(f"Time to first token: {first_token_time - inference_start_time:.2f} seconds")
        inference_count += 1
        print(result.contents.text.decode(), end="", flush=True)
    elif state == LLMCallState.RKLLM_RUN_FINISH:
        print("\n\n(finished)")
        inference_done_queue.put("DONE")
    elif state == LLMCallState.RKLLM_RUN_ERROR:
        print("\nError occurred during LLM call")
        inference_done_queue.put("ERROR")

# 初始化LLM
param = create_default_param()
param.model_path = MODEL_PATH.encode()
param.img_start = "<image>".encode()
param.img_end = "</image>".encode()
param.img_content = "<unk>".encode()
extend_param = RKLLMExtendParam()
extend_param.base_domain_id = 1
param.extend_param = extend_param

model_size = os.path.getsize(MODEL_PATH)
print(f"Start loading language model (size: {model_size / 1024 / 1024:.2f} MB)")
start_time = time.time()
handle = init(param, result_callback)
end_time = time.time()
print(f"Language model loaded in {end_time - start_time:.2f} seconds")

# 通知主进程加载完成
load_ready_queue.put("llm_ready")

# 创建推理参数
infer_param = RKLLMInferParam()
infer_param.mode = RKLLMInferMode.RKLLM_INFER_GENERATE.value

while True:
    prompt = prompt_queue.get()
    if prompt == "STOP":
        break
        
    image_embeddings = embedding_queue.get()
    logging.debug("llm::Embedding dequeued successfully.")
    if isinstance(image_embeddings, str) and image_embeddings == "ERROR":
        print("Error processing image")
        continue
    
    rkllm_input = create_rkllm_input(RKLLMInputType.RKLLM_INPUT_MULTIMODAL,
                                    prompt=prompt,
                                    image_embed=image_embeddings)

    print("handle::",handle)
    print("rkllm_input::",rkllm_input)
    print("infer_param::",infer_param)
    
    inference_start_time = time.time()
    
    run(handle, rkllm_input, infer_param, None)
        
# 清理
abort(handle)
destroy(handle)

PROMPT_TEXT_PREFIX = "<|im_start|>system You are a helpful assistant. <|im_end|> <|im_start|>user"
PROMPT_TEXT_POSTFIX = "<|im_end|><|im_start|>assistant"

def main():
load_ready_queue = Queue()
embedding_queue = Queue()
img_path_queue = Queue()
prompt_queue = Queue()
inference_done_queue = Queue()
start_event = Event()

vision_process = Process(target=vision_encoder_process,
                       args=(load_ready_queue, embedding_queue, img_path_queue, start_event))
lm_process = Process(target=llm_process,
                    args=(load_ready_queue, embedding_queue, prompt_queue, inference_done_queue, start_event))

vision_process.start()
lm_process.start()

# 等待模型加载
ready_count = 0
while ready_count < 2:
    status = load_ready_queue.get()
    print(f"Received ready signal: {status}")
    ready_count += 1

print("All models loaded, starting interactive mode...")
start_event.set()

# 交互循环
try:
    while True:
        
        input_str = "describe this <image> with a word"
        prompt = PROMPT_TEXT_PREFIX + input_str + PROMPT_TEXT_POSTFIX
    
        img_path = "/home/orangepi/Desktop/MiniCPM-V-2_6-rkllm-sex-student/test.jpg"
        logging.info("Processing image: %s with prompt: %s", img_path, prompt)
        
        img_path_queue.put(img_path)
        prompt_queue.put(prompt)
        
        # 等待推理完成
        status = inference_done_queue.get()
        
        if status == "ERROR":
            print("Inference failed")
        
except KeyboardInterrupt:
    print("\nExiting...")
    img_path_queue.put("STOP")
    prompt_queue.put("STOP")

vision_process.join()
lm_process.join()

if name == "main":
main()
这是完整代码

Sign up or log in to comment