# from SoundScribe.speakerID import find_user import datetime import requests import torch import json import cv2 import os API_URL = 'https://bceb7f41087d-7754001953109090881.ngrok-free.app/' def get_time(): return datetime.datetime.now().strftime('%a %d %b %Y %I:%M %p') def load_chat(): full_history = [] sorted_list = [] prev_id = "" with open('./database/chat_history.jsonl', 'r') as history: for line in history: chat_message = json.loads(line) id = chat_message['ID'] message = chat_message['message'] if id != prev_id: full_history.append(f"{id}: {message}\n") else: full_history[-1] += message+"\n" prev_id = id for chat in full_history: if chat.startswith("CRYSTAL: ") or chat.startswith("Helper: "): sorted_list[-1] += "\n"+chat else: sorted_list.append(chat) return sorted_list def record_chat(role, message): new_message = { "ID": role, "message": message[0] } with open('./database/chat_history.jsonl', 'a') as history: history.write(json.dumps(new_message) + '\n') def check_api_usage(): USE_CLOUD_API = False if os.path.isdir("models"): if requests.get(API_URL).ok: choice = input( "CRYSTAL CLOUD API HAS BEEN DETECTED.\n" "Would you like to:\n" "\t1. Use Cloud API Computing\n" "\t2. Use On-Device Calculations\n" "Enter your choice (1/2): ") USE_CLOUD_API = choice == "1" if USE_CLOUD_API: print("RUNNING ON CLOUD") else: print("RUNNING LOCALLY") else: print("CRYSTAL Cloud API not reachable.") else: raise RuntimeError( "Unauthorized access! This action will be reported immediately!") return USE_CLOUD_API def perceptrix(prompt): url = API_URL+"perceptrix" payload = {'prompt': prompt} headers = {'Content-Type': 'application/json'} response = requests.post(url, json=payload, headers=headers) return response.json()["message"] def robotix(prompt): url = API_URL+"robotix" payload = {'prompt': prompt} headers = {'Content-Type': 'application/json'} response = requests.post(url, json=payload, headers=headers) return response.json()["message"] def identify_objects_from_text(prompt): url = API_URL+"identify_objects_from_text" payload = {'prompt': prompt} headers = {'Content-Type': 'application/json'} response = requests.post(url, json=payload, headers=headers) return response.json()["message"] def search_keyword(prompt): url = API_URL+"search_keyword" payload = {'prompt': prompt} headers = {'Content-Type': 'application/json'} response = requests.post(url, json=payload, headers=headers) return response.json()["message"] def answer_question(prompt, frame): url = API_URL+"vqa" if type(frame) == str: frame = cv2.imread(frame) _, image_data = cv2.imencode('.jpg', frame) image = image_data.tolist() payload = {'image': image, 'prompt': prompt} headers = {'Content-Type': 'application/json'} response = requests.post(url, json=payload, headers=headers) return response.json()["message"] def find_object_description(prompt, frame): url = API_URL+"object_description" if type(frame) == str: frame = cv2.imread(frame) _, image_data = cv2.imencode('.jpg', frame) image = image_data.tolist() payload = {'image': image, 'prompt': prompt} headers = {'Content-Type': 'application/json'} response = requests.post(url, json=payload, headers=headers) return response.json()["message"] def locate_object(prompt, frame): url = API_URL+"locate_object" if type(frame) == str: frame = cv2.imread(frame) _, image_data = cv2.imencode('.jpg', frame) image = image_data.tolist() payload = {'image': image, 'prompt': prompt} headers = {'Content-Type': 'application/json'} response = requests.post(url, json=payload, headers=headers) return response.json()["annotated_image"], response.json()["message"] def setup_device(): if torch.backends.mps.is_available(): device = torch.device("mps") elif torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") return device def transcribe(audio): url = API_URL + "transcribe" with open(audio, 'rb') as audio_file: files = {'audio': (audio, audio_file)} response = requests.post(url, files=files) transcription = response.json()["message"] print(transcription) # user = find_user("database/recording.wav") user = "Vatsal" if user != "Crystal": with open('./database/input.txt', 'w', encoding="utf-8") as write_to: write_to.write(transcription[1:]) return transcription, user