import requests import time from openpyxl import load_workbook def read_excel(file_path): # 读取Excel文件 wb = load_workbook('music_id.xlsx') # 获取指定表 ws = wb['Results'] # 创建一个music_id到title的字典 music_dict = {} # 读取数据,并填充字典,假设第一行为表头 for row in ws.iter_rows(min_row=2, values_only=True): # 从第二行开始,跳过表头 title = row[0] # 第一列为 title music_id = row[1] # 第二列为 music_id if music_id and title: # 确保数据有效 music_dict[music_id] = title return music_dict # 匹配search_score中的MusicId返回对应的Title def match_title_from_json(excel_dict, json_data): search_scores = json_data.get('search_score', []) matched_titles = {music_id: excel_dict.get(music_id, '') for music_id in search_scores} first_value = next(iter(matched_titles.values())) return first_value class EvaluationProcessor: def __init__(self, api_key): self.api_key = api_key self.audio_url = 'https://test.aitanzou.com/web/api/submit_audio' self.video_url = 'https://test.aitanzou.com/web/api/submit_video' self.result_url = 'https://test.aitanzou.com/web/api/getEvaluationResult' self.headers = { 'API-Key': self.api_key } self.excel_file_path = 'music_id.xlsx' def submit_evaluation(self, file_path, is_video=False, music_id=None, hand=-1, order_start=0, order_end=-1, repeat_type=0): url = self.video_url if is_video else self.audio_url print(url) # 根据文件类型选择 MIME 类型 file_type = 'video/mp4' if is_video else 'audio/mp3' file_name = "video_file.mp4" if is_video else "audio_file.mp3" # 打开文件并指定文件类型 files = { 'file': (file_name, open(file_path, 'rb'), file_type) } data = { 'musicId': music_id, 'hand': hand, 'orderStart': order_start, 'orderEnd': order_end, 'repeatType': repeat_type } response = requests.post(url, headers=self.headers, files=files, data=data) if response.status_code == 200: data = response.json() task_id = data['data']['taskId'] return task_id else: raise Exception(f'Error: {response.status_code}, {response.text}') def get_evaluation_result(self, task_id): params = {'taskId': task_id} while True: result_response = requests.get(self.result_url, headers=self.headers, params=params) #print(result_response) if result_response.status_code == 200: result_data = result_response.json() status = result_data['data']['status'] #print(result_data) if status in ['pending', 'processing']: print(f'Task is still {status}...') time.sleep(2) elif status == 'completed': title_musicid_dict = read_excel(self.excel_file_path) # 匹配并输出对应的Title title = match_title_from_json(title_musicid_dict, result_data['data']) return result_data['data'], title else: raise Exception(f'Task failed: {result_data["message"]}') else: raise Exception(f'Error: {result_response.status_code}, {result_response.text}') def process_evaluation(self, file_path, is_video, music_id=None, hand=-1, order_start=0, order_end=-1, repeat_type=0): task_id = self.submit_evaluation(file_path, is_video, music_id, hand, order_start, order_end, repeat_type) return self.get_evaluation_result(task_id)