Spaces:
Running
Running
import requests | |
import time | |
from openpyxl import load_workbook | |
def read_excel(file_path): | |
# 读取Excel文件 | |
wb = load_workbook('music_id.xlsx') | |
# 获取指定表 | |
ws = wb['Results'] | |
# 创建一个music_id到title的字典 | |
music_dict = {} | |
# 读取数据,并填充字典,假设第一行为表头 | |
for row in ws.iter_rows(min_row=2, values_only=True): # 从第二行开始,跳过表头 | |
title = row[0] # 第一列为 title | |
music_id = row[1] # 第二列为 music_id | |
if music_id and title: # 确保数据有效 | |
music_dict[music_id] = title | |
return music_dict | |
# 匹配search_score中的MusicId返回对应的Title | |
def match_title_from_json(excel_dict, json_data): | |
search_scores = json_data.get('search_score', []) | |
matched_titles = {music_id: excel_dict.get(music_id, '') for music_id in search_scores} | |
first_value = next(iter(matched_titles.values())) | |
return first_value | |
class EvaluationProcessor: | |
def __init__(self, api_key): | |
self.api_key = api_key | |
self.audio_url = 'https://test.aitanzou.com/web/api/submit_audio' | |
self.video_url = 'https://test.aitanzou.com/web/api/submit_video' | |
self.result_url = 'https://test.aitanzou.com/web/api/getEvaluationResult' | |
self.headers = { | |
'API-Key': self.api_key | |
} | |
self.excel_file_path = 'music_id.xlsx' | |
def submit_evaluation(self, file_path, is_video=False, music_id=None, hand=-1, order_start=0, order_end=-1, repeat_type=0): | |
url = self.video_url if is_video else self.audio_url | |
print(url) | |
# 根据文件类型选择 MIME 类型 | |
file_type = 'video/mp4' if is_video else 'audio/mp3' | |
file_name = "video_file.mp4" if is_video else "audio_file.mp3" | |
# 打开文件并指定文件类型 | |
files = { | |
'file': (file_name, open(file_path, 'rb'), file_type) | |
} | |
data = { | |
'musicId': music_id, | |
'hand': hand, | |
'orderStart': order_start, | |
'orderEnd': order_end, | |
'repeatType': repeat_type | |
} | |
response = requests.post(url, headers=self.headers, files=files, data=data) | |
if response.status_code == 200: | |
data = response.json() | |
task_id = data['data']['taskId'] | |
return task_id | |
else: | |
raise Exception(f'Error: {response.status_code}, {response.text}') | |
def get_evaluation_result(self, task_id): | |
params = {'taskId': task_id} | |
while True: | |
result_response = requests.get(self.result_url, headers=self.headers, params=params) | |
#print(result_response) | |
if result_response.status_code == 200: | |
result_data = result_response.json() | |
status = result_data['data']['status'] | |
#print(result_data) | |
if status in ['pending', 'processing']: | |
print(f'Task is still {status}...') | |
time.sleep(2) | |
elif status == 'completed': | |
title_musicid_dict = read_excel(self.excel_file_path) | |
# 匹配并输出对应的Title | |
title = match_title_from_json(title_musicid_dict, result_data['data']) | |
return result_data['data'], title | |
else: | |
raise Exception(f'Task failed: {result_data["message"]}') | |
else: | |
raise Exception(f'Error: {result_response.status_code}, {result_response.text}') | |
def process_evaluation(self, file_path, is_video, music_id=None, hand=-1, order_start=0, order_end=-1, repeat_type=0): | |
task_id = self.submit_evaluation(file_path, is_video, music_id, hand, order_start, order_end, repeat_type) | |
return self.get_evaluation_result(task_id) | |