|
|
|
import datetime |
|
import requests |
|
import torch |
|
import json |
|
import cv2 |
|
import os |
|
|
|
|
|
API_URL = 'https://bceb7f41087d-7754001953109090881.ngrok-free.app/' |
|
|
|
def get_time(): |
|
return datetime.datetime.now().strftime('%a %d %b %Y %I:%M %p') |
|
|
|
def load_chat(): |
|
full_history = [] |
|
sorted_list = [] |
|
prev_id = "" |
|
with open('./database/chat_history.jsonl', 'r') as history: |
|
for line in history: |
|
chat_message = json.loads(line) |
|
id = chat_message['ID'] |
|
message = chat_message['message'] |
|
if id != prev_id: |
|
full_history.append(f"{id}: {message}\n") |
|
else: |
|
full_history[-1] += message+"\n" |
|
|
|
prev_id = id |
|
|
|
for chat in full_history: |
|
if chat.startswith("CRYSTAL: ") or chat.startswith("Helper: "): |
|
sorted_list[-1] += "\n"+chat |
|
else: |
|
sorted_list.append(chat) |
|
|
|
return sorted_list |
|
|
|
|
|
def record_chat(role, message): |
|
new_message = { |
|
"ID": role, |
|
"message": message[0] |
|
} |
|
|
|
with open('./database/chat_history.jsonl', 'a') as history: |
|
history.write(json.dumps(new_message) + '\n') |
|
|
|
|
|
def check_api_usage(): |
|
USE_CLOUD_API = False |
|
if os.path.isdir("models"): |
|
if requests.get(API_URL).ok: |
|
choice = input( |
|
"CRYSTAL CLOUD API HAS BEEN DETECTED.\n" |
|
"Would you like to:\n" |
|
"\t1. Use Cloud API Computing\n" |
|
"\t2. Use On-Device Calculations\n" |
|
"Enter your choice (1/2): ") |
|
USE_CLOUD_API = choice == "1" |
|
if USE_CLOUD_API: |
|
print("RUNNING ON CLOUD") |
|
else: |
|
print("RUNNING LOCALLY") |
|
|
|
else: |
|
print("CRYSTAL Cloud API not reachable.") |
|
else: |
|
raise RuntimeError( |
|
"Unauthorized access! This action will be reported immediately!") |
|
|
|
return USE_CLOUD_API |
|
|
|
|
|
def perceptrix(prompt): |
|
url = API_URL+"perceptrix" |
|
|
|
payload = {'prompt': prompt} |
|
headers = {'Content-Type': 'application/json'} |
|
response = requests.post(url, json=payload, headers=headers) |
|
|
|
return response.json()["message"] |
|
|
|
|
|
def robotix(prompt): |
|
url = API_URL+"robotix" |
|
|
|
payload = {'prompt': prompt} |
|
headers = {'Content-Type': 'application/json'} |
|
response = requests.post(url, json=payload, headers=headers) |
|
|
|
return response.json()["message"] |
|
|
|
|
|
def identify_objects_from_text(prompt): |
|
url = API_URL+"identify_objects_from_text" |
|
|
|
payload = {'prompt': prompt} |
|
headers = {'Content-Type': 'application/json'} |
|
response = requests.post(url, json=payload, headers=headers) |
|
|
|
return response.json()["message"] |
|
|
|
|
|
def search_keyword(prompt): |
|
url = API_URL+"search_keyword" |
|
|
|
payload = {'prompt': prompt} |
|
headers = {'Content-Type': 'application/json'} |
|
response = requests.post(url, json=payload, headers=headers) |
|
|
|
return response.json()["message"] |
|
|
|
|
|
def answer_question(prompt, frame): |
|
url = API_URL+"vqa" |
|
if type(frame) == str: |
|
frame = cv2.imread(frame) |
|
|
|
_, image_data = cv2.imencode('.jpg', frame) |
|
image = image_data.tolist() |
|
|
|
payload = {'image': image, |
|
'prompt': prompt} |
|
headers = {'Content-Type': 'application/json'} |
|
response = requests.post(url, json=payload, headers=headers) |
|
|
|
return response.json()["message"] |
|
|
|
|
|
def find_object_description(prompt, frame): |
|
url = API_URL+"object_description" |
|
if type(frame) == str: |
|
frame = cv2.imread(frame) |
|
_, image_data = cv2.imencode('.jpg', frame) |
|
image = image_data.tolist() |
|
|
|
payload = {'image': image, |
|
'prompt': prompt} |
|
headers = {'Content-Type': 'application/json'} |
|
response = requests.post(url, json=payload, headers=headers) |
|
|
|
return response.json()["message"] |
|
|
|
|
|
def locate_object(prompt, frame): |
|
url = API_URL+"locate_object" |
|
if type(frame) == str: |
|
frame = cv2.imread(frame) |
|
_, image_data = cv2.imencode('.jpg', frame) |
|
image = image_data.tolist() |
|
|
|
payload = {'image': image, |
|
'prompt': prompt} |
|
headers = {'Content-Type': 'application/json'} |
|
response = requests.post(url, json=payload, headers=headers) |
|
|
|
return response.json()["annotated_image"], response.json()["message"] |
|
|
|
|
|
def setup_device(): |
|
if torch.backends.mps.is_available(): |
|
device = torch.device("mps") |
|
elif torch.cuda.is_available(): |
|
device = torch.device("cuda") |
|
else: |
|
device = torch.device("cpu") |
|
return device |
|
|
|
def transcribe(audio): |
|
url = API_URL + "transcribe" |
|
with open(audio, 'rb') as audio_file: |
|
files = {'audio': (audio, audio_file)} |
|
response = requests.post(url, files=files) |
|
|
|
transcription = response.json()["message"] |
|
print(transcription) |
|
|
|
user = "Vatsal" |
|
if user != "Crystal": |
|
with open('./database/input.txt', 'w', encoding="utf-8") as write_to: |
|
write_to.write(transcription[1:]) |
|
return transcription, user |
|
|