import gradio as gr from PIL import Image from io import BytesIO import requests import time import openai import os from openai import OpenAI from dotenv import load_dotenv load_dotenv() client = OpenAI( # This is the default and can be omitted api_key = os.getenv("OPENAI_API_KEY") #api_key=os.environ.get("OPENAI_API_KEY"), ) # 设置OpenAI API密钥 openai.api_key = "sk-ut4AJ8UzqsoTVaeIATrXBGfLRcYhZbba04jnSZyOwlT3BlbkFJsEa4UxwYuflmBYBi2oLQnZJIB8-zCzTWJ9QQLMzt8A" engine = "gpt-4o-mini" class ImageProcessor: def __init__(self, api_key): self.api_key = api_key self.submit_url = 'https://test.aitanzou.com/web/api/task/submit' self.result_url = 'https://test.aitanzou.com/web/api/getResult' self.headers = { 'API-Key': self.api_key } def submit_images(self, image_bytes_list): files = [('images', ('image.png', image_bytes, 'image/png')) for image_bytes in image_bytes_list] response = requests.post(self.submit_url, headers=self.headers, files=files) if response.status_code == 200: data = response.json() if 'data' in data and 'taskId' in data['data']: task_id = data['data']['taskId'] return task_id else: raise Exception(f'Unexpected response format: {data}') else: raise Exception(f'Error: {response.status_code}, {response.text}') def get_result(self, task_id): params = {'taskId': task_id} while True: result_response = requests.get(self.result_url, params=params) if result_response.status_code == 200: result_data = result_response.json() if 'data' in result_data and 'abcPath' in result_data['data']: if result_data['data']['abcPath'] is None: print('Task is still pending...') time.sleep(10) else: url = result_data['data']['abcPath'] response = requests.get(url) if response.status_code == 200: return response.text else: raise Exception(f'Error retrieving file content: {response.status_code}, {response.text}') else: raise Exception(f'Unexpected result format: {result_data}') else: raise Exception(f'Error: {result_response.status_code}, {result_response.text}') def process_images(self, image_bytes_list): task_id = self.submit_images(image_bytes_list) return self.get_result(task_id) # 设置Music API密钥 api_key = 'ddc85b14-bd83-4757-9bc4-8a11194da536' image_processor = ImageProcessor(api_key) # 定义处理函数 def process_input(text=None, images=None, audio=None): # 创建GPT请求的描述 system = "1.你是一个专业的钢琴音乐教师,只能回答音乐知识,回复的内容为普通文本格式。如果提供的乐谱是abc记谱法,则回复时不要用abc记谱法,需要转换为传统的普通记谱法使用专业词汇进行回答问题2.你将根据下面指令回答问题,但是不能违反第一条指令,也不能在回复中提及。" messages = [{"role": "system", "content": system}] prompt = "" if text: prompt += f"\nText input: {text}" if images: # 使用ImageProcessor处理图像 image_bytes_list = [] for image in images: img = Image.open(image.name) image_bytes = BytesIO() img.save(image_bytes, format="PNG") image_bytes.seek(0) image_bytes_list.append(image_bytes.getvalue()) try: processed_image_result = image_processor.process_images(image_bytes_list) #prompt += f"\n乐谱的内容如下,这是一首杜维诺伊的曲子,请你根据他的曲风回答问题: {processed_image_result}" prompt += f"\n乐谱的内容如下,这是一首杜维诺伊的曲子,请你根据他的曲风回答问题: {processed_image_result}" except Exception as e: return f"Error processing image: {e}", None if audio: # 将音频转换为描述(这里简单地用占位符) prompt += "\nAudio input: A description of the audio has been generated." # 使用GPT API进行处理 try: messages.append({"role": "user", "content": prompt}) response = client.chat.completions.create( model=engine, messages=messages, temperature=0.2, max_tokens=4096, top_p=0.95, frequency_penalty=0, presence_penalty=0, stop=None ) print(response.choices[0].message.content) return response.choices[0].message.content except Exception as e: return f"Error: {e}", None # 创建Gradio接口 iface = gr.Interface( fn=process_input, inputs=[ gr.Textbox(label="Input Text", placeholder="Enter text here", lines=2), # Text input with submit button gr.File(label="Input Images", file_count="multiple", type="file"), # 支持多文件上传 # gr.Audio(label="Input Audio", type="filepath"), ], outputs=[ gr.Textbox(label="Output Text"), # gr.Audio(label="Output Audio") # 目前示例中未处理音频输出 ], live=False, ) # 启动Gradio应用 iface.launch()