CRYSTAL-R1 / utils.py
crystal-technologies's picture
Upload 1671 files
38057e4
raw
history blame
5.11 kB
# from SoundScribe.speakerID import find_user
import datetime
import requests
import torch
import json
import cv2
import os
API_URL = 'https://bceb7f41087d-7754001953109090881.ngrok-free.app/'
def get_time():
return datetime.datetime.now().strftime('%a %d %b %Y %I:%M %p')
def load_chat():
full_history = []
sorted_list = []
prev_id = ""
with open('./database/chat_history.jsonl', 'r') as history:
for line in history:
chat_message = json.loads(line)
id = chat_message['ID']
message = chat_message['message']
if id != prev_id:
full_history.append(f"{id}: {message}\n")
else:
full_history[-1] += message+"\n"
prev_id = id
for chat in full_history:
if chat.startswith("CRYSTAL: ") or chat.startswith("Helper: "):
sorted_list[-1] += "\n"+chat
else:
sorted_list.append(chat)
return sorted_list
def record_chat(role, message):
new_message = {
"ID": role,
"message": message[0]
}
with open('./database/chat_history.jsonl', 'a') as history:
history.write(json.dumps(new_message) + '\n')
def check_api_usage():
USE_CLOUD_API = False
if os.path.isdir("models"):
if requests.get(API_URL).ok:
choice = input(
"CRYSTAL CLOUD API HAS BEEN DETECTED.\n"
"Would you like to:\n"
"\t1. Use Cloud API Computing\n"
"\t2. Use On-Device Calculations\n"
"Enter your choice (1/2): ")
USE_CLOUD_API = choice == "1"
if USE_CLOUD_API:
print("RUNNING ON CLOUD")
else:
print("RUNNING LOCALLY")
else:
print("CRYSTAL Cloud API not reachable.")
else:
raise RuntimeError(
"Unauthorized access! This action will be reported immediately!")
return USE_CLOUD_API
def perceptrix(prompt):
url = API_URL+"perceptrix"
payload = {'prompt': prompt}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, json=payload, headers=headers)
return response.json()["message"]
def robotix(prompt):
url = API_URL+"robotix"
payload = {'prompt': prompt}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, json=payload, headers=headers)
return response.json()["message"]
def identify_objects_from_text(prompt):
url = API_URL+"identify_objects_from_text"
payload = {'prompt': prompt}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, json=payload, headers=headers)
return response.json()["message"]
def search_keyword(prompt):
url = API_URL+"search_keyword"
payload = {'prompt': prompt}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, json=payload, headers=headers)
return response.json()["message"]
def answer_question(prompt, frame):
url = API_URL+"vqa"
if type(frame) == str:
frame = cv2.imread(frame)
_, image_data = cv2.imencode('.jpg', frame)
image = image_data.tolist()
payload = {'image': image,
'prompt': prompt}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, json=payload, headers=headers)
return response.json()["message"]
def find_object_description(prompt, frame):
url = API_URL+"object_description"
if type(frame) == str:
frame = cv2.imread(frame)
_, image_data = cv2.imencode('.jpg', frame)
image = image_data.tolist()
payload = {'image': image,
'prompt': prompt}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, json=payload, headers=headers)
return response.json()["message"]
def locate_object(prompt, frame):
url = API_URL+"locate_object"
if type(frame) == str:
frame = cv2.imread(frame)
_, image_data = cv2.imencode('.jpg', frame)
image = image_data.tolist()
payload = {'image': image,
'prompt': prompt}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, json=payload, headers=headers)
return response.json()["annotated_image"], response.json()["message"]
def setup_device():
if torch.backends.mps.is_available():
device = torch.device("mps")
elif torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")
return device
def transcribe(audio):
url = API_URL + "transcribe"
with open(audio, 'rb') as audio_file:
files = {'audio': (audio, audio_file)}
response = requests.post(url, files=files)
transcription = response.json()["message"]
print(transcription)
# user = find_user("database/recording.wav")
user = "Vatsal"
if user != "Crystal":
with open('./database/input.txt', 'w', encoding="utf-8") as write_to:
write_to.write(transcription[1:])
return transcription, user