Music_LMMs / evaluation_processor.py
fistyee
update
d0c2b7c
raw
history blame
3.9 kB
import requests
import time
from openpyxl import load_workbook
def read_excel(file_path):
# 读取Excel文件
wb = load_workbook('music_id.xlsx')
# 获取指定表
ws = wb['Results']
# 创建一个music_id到title的字典
music_dict = {}
# 读取数据,并填充字典,假设第一行为表头
for row in ws.iter_rows(min_row=2, values_only=True): # 从第二行开始,跳过表头
title = row[0] # 第一列为 title
music_id = row[1] # 第二列为 music_id
if music_id and title: # 确保数据有效
music_dict[music_id] = title
return music_dict
# 匹配search_score中的MusicId返回对应的Title
def match_title_from_json(excel_dict, json_data):
search_scores = json_data.get('search_score', [])
matched_titles = {music_id: excel_dict.get(music_id, '') for music_id in search_scores}
first_value = next(iter(matched_titles.values()))
return first_value
class EvaluationProcessor:
def __init__(self, api_key):
self.api_key = api_key
self.audio_url = 'https://test.aitanzou.com/web/api/submit_audio'
self.video_url = 'https://test.aitanzou.com/web/api/submit_video'
self.result_url = 'https://test.aitanzou.com/web/api/getEvaluationResult'
self.headers = {
'API-Key': self.api_key
}
self.excel_file_path = 'music_id.xlsx'
def submit_evaluation(self, file_path, is_video=False, music_id=None, hand=-1, order_start=0, order_end=-1, repeat_type=0):
url = self.video_url if is_video else self.audio_url
print(url)
# 根据文件类型选择 MIME 类型
file_type = 'video/mp4' if is_video else 'audio/mp3'
file_name = "video_file.mp4" if is_video else "audio_file.mp3"
# 打开文件并指定文件类型
files = {
'file': (file_name, open(file_path, 'rb'), file_type)
}
data = {
'musicId': music_id,
'hand': hand,
'orderStart': order_start,
'orderEnd': order_end,
'repeatType': repeat_type
}
response = requests.post(url, headers=self.headers, files=files, data=data)
if response.status_code == 200:
data = response.json()
task_id = data['data']['taskId']
return task_id
else:
raise Exception(f'Error: {response.status_code}, {response.text}')
def get_evaluation_result(self, task_id):
params = {'taskId': task_id}
while True:
result_response = requests.get(self.result_url, headers=self.headers, params=params)
#print(result_response)
if result_response.status_code == 200:
result_data = result_response.json()
status = result_data['data']['status']
#print(result_data)
if status in ['pending', 'processing']:
print(f'Task is still {status}...')
time.sleep(2)
elif status == 'completed':
title_musicid_dict = read_excel(self.excel_file_path)
# 匹配并输出对应的Title
title = match_title_from_json(title_musicid_dict, result_data['data'])
return result_data['data'], title
else:
raise Exception(f'Task failed: {result_data["message"]}')
else:
raise Exception(f'Error: {result_response.status_code}, {result_response.text}')
def process_evaluation(self, file_path, is_video, music_id=None, hand=-1, order_start=0, order_end=-1, repeat_type=0):
task_id = self.submit_evaluation(file_path, is_video, music_id, hand, order_start, order_end, repeat_type)
return self.get_evaluation_result(task_id)