CRYSTAL-R1 / crystal.py
crystal-technologies's picture
Upload 1671 files
38057e4
raw
history blame
5.66 kB
from utils import setup_device, check_api_usage, record_chat, load_chat, get_time
# from SoundScribe.speakerID import find_user
from SoundScribe import speak, live_listen
from internet import get_weather_data
import threading
import warnings
import time
import cv2
import os
warnings.filterwarnings("ignore")
USE_CLOUD_API = check_api_usage()
if USE_CLOUD_API:
from utils import answer_question, find_object_description, locate_object, perceptrix, robotix, identify_objects_from_text, search_keyword
else:
from CircumSpect import answer_question, find_object_description, locate_object
from Perceptrix.engine import perceptrix, robotix, identify_objects_from_text, search_keyword
SPEECH_COMMAND = "Speak"
VQA_COMMAND = "VQA"
ROBOT_COMMAND = "Robot"
INTERNET_COMMAND = "Internet"
CLI_COMMAND = "CLI"
NOTE_COMMAND = "Note"
AUTOMATION_COMAND = "Home Automation"
weather = None
device = setup_device()
print("INITIALIZING CRYSTAL - DETECTED DEVICE:", str(device).upper(),
"Acceleration" if str(device) != "cpu" else "")
cap = cv2.VideoCapture(0)
recognized_text = ""
current_events = ""
keyword = ""
def camera():
while True:
_, img = cap.read()
cv2.imwrite("./database/current_frame.jpg", img)
def understand_surroundings():
global current_events
while True:
current_events = answer_question(
"Describe your surroundings", "./database/current_frame.jpg")
time.sleep(10)
def note(text):
with open("database/notes.txt", "w") as notes:
notes.write(text)
def output(response, input_text):
commands = response.split("<###CRYSTAL-INTERNAL###>")
helper = ""
image_file = "current_frame.jpg"
record_chat(input_text.split(":")[0], input_text.split(":")[1:])
for command in commands:
command_parts = command.split('\n', 1)
command = command_parts[0]
body = command_parts[1] if len(command_parts) > 1 else ""
if SPEECH_COMMAND in command:
speak(body)
elif VQA_COMMAND in command:
helper += answer_question(body, image_file)
elif ROBOT_COMMAND in command:
description = find_object_description(body, image_file)
all_objects = identify_objects_from_text(description)
objects_list = locate_object(all_objects, image_file)
robot_code = robotix(f"#{input_text}\nobjects={objects_list}")
with open("robot_control.py", "w+") as robot_file:
robot_file.write("import robot\n\n")
robot_file.write(robot_code)
exec(robot_file.read())
elif INTERNET_COMMAND in command:
with open("extract_internet.py", "w+") as web_file:
web_file.write("import crystal_internet\n\n")
web_file.write(body)
exec(web_file.read())
with open("./database/web_results.txt", 'r') as web_results:
helper += web_results.read()
elif CLI_COMMAND in command:
os.system(body)
elif NOTE_COMMAND in command:
note(body)
if helper:
record_chat("Helper", helper)
response = perceptrix(response+"\nHelper: "+helper)
output(response)
def get_weather():
global weather
while True:
data = get_weather_data()
location = data["location"]
temperature = data["temperature"]
details = data["details"]
name = data["name"]
weather = f"{location} is {name} with {temperature} and {details}"
time.sleep(90)
eyes = threading.Thread(target=camera)
ears = threading.Thread(target=live_listen)
weather_thread = threading.Thread(target=get_weather)
visual_processing = threading.Thread(target=understand_surroundings)
ears.start()
eyes.start()
weather_thread.start()
visual_processing.start()
while True:
full_history = load_chat()
# username = find_user("recording.wav")
username = "Vatsal"
with open("database/notes.txt", "r") as notes:
notes = notes.read()
input_text = f"Time- {get_time()}\nWeather- {weather}\nSurroundings- {current_events}"+(f"\nNotes- {notes}" if notes else "")
with open("./database/input.txt", 'r') as recognized:
recognized_text = recognized.read()
if recognized_text:
print("CRYSTAL heard:", recognized_text, "from", username)
input_text += "\nPrompt: "+recognized_text
keyword = search_keyword(input_text)
relevant_history = []
with open("database/notes.txt", "r") as notes:
notes = notes.read()
if keyword.startswith("Search Keyword"):
keyword = keyword.split(":")[-1]
elif keyword.startswith("Chat Index"):
keyword = int(keyword.split(":")[-1])
if type(keyword) == str:
for chat in full_history:
if keyword in chat:
relevant_history.append(chat)
relevant_history = "\n".join(relevant_history)
elif type(keyword) == int:
relevant_history = full_history[keyword]
relevant_history += "\n"+"\n".join(full_history[-3:])
relevant_history = f"{relevant_history}\n{username}: " + \
"\n" + input_text + "\nCRYSTAL: "
response = str(perceptrix(relevant_history))
response = "<###CRYSTAL-INTERNAL###> Speech\n"+response
with open("./database/input.txt", 'w') as clearfile:
clearfile.write("")
if current_events != "":
print("CRYSTAL sees:", current_events)
response = str(perceptrix(input_text))
current_events = ""
output(response, input_text)