import requests import time class EvaluationProcessor: def __init__(self, api_key): self.api_key = api_key self.audio_url = 'https://test.aitanzou.com/web/api/submit_audio' self.video_url = 'https://test.aitanzou.com/web/api/submit_video' self.result_url = 'https://test.aitanzou.com/web/api/getEvaluationResult' self.headers = { 'API-Key': self.api_key } def submit_evaluation(self, file_path, music_id=None, hand=-1, order_start=0, order_end=-1, repeat_type=0, is_video=False): url = self.video_url if is_video else self.audio_url files = { 'file': ("video_file.mp4", open(file_path, 'rb'), 'video/mp4') # 确保给文件指定一个名称 } data = { 'musicId': music_id, 'hand': hand, 'orderStart': order_start, 'orderEnd': order_end, 'repeatType': repeat_type } response = requests.post(url, headers=self.headers, files=files, data=data) if response.status_code == 200: data = response.json() task_id = data['data']['taskId'] return task_id else: raise Exception(f'Error: {response.status_code}, {response.text}') def get_evaluation_result(self, task_id): params = {'taskId': task_id} while True: result_response = requests.get(self.result_url, headers=self.headers, params=params) if result_response.status_code == 200: result_data = result_response.json() status = result_data['data']['status'] if status in ['pending', 'processing']: print(f'Task is still {status}...') time.sleep(2) elif status == 'completed': return result_data['data'] else: raise Exception(f'Task failed: {result_data["message"]}') else: raise Exception(f'Error: {result_response.status_code}, {result_response.text}') def process_evaluation(self, file_path, music_id=None, hand=-1, order_start=0, order_end=-1, repeat_type=0, is_video=False): task_id = self.submit_evaluation(file_path, music_id, hand, order_start, order_end, repeat_type, is_video) return self.get_evaluation_result(task_id)