|
from utils import setup_device, check_api_usage, record_chat, load_chat, get_time |
|
from SoundScribe.speakerID import find_user |
|
from SoundScribe import speak, live_listen |
|
from internet import get_weather_data |
|
import threading |
|
import warnings |
|
import time |
|
import cv2 |
|
import os |
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
USE_CLOUD_API = check_api_usage() |
|
|
|
if USE_CLOUD_API: |
|
from utils import answer_question, find_object_description, locate_object, perceptrix, robotix, identify_objects_from_text, search_keyword |
|
else: |
|
from CircumSpect import answer_question, find_object_description, locate_object |
|
from Perceptrix.engine import perceptrix, robotix, identify_objects_from_text, search_keyword |
|
|
|
|
|
SPEECH_COMMAND = "Speak" |
|
VQA_COMMAND = "VQA" |
|
ROBOT_COMMAND = "Robot" |
|
INTERNET_COMMAND = "Internet" |
|
CLI_COMMAND = "CLI" |
|
NOTE_COMMAND = "Note" |
|
AUTOMATION_COMAND = "Home Automation" |
|
|
|
weather = None |
|
|
|
device = setup_device() |
|
print("INITIALIZING CRYSTAL -", "Running on" if str(device) == "cpu" else "Running with", str(device).upper(), |
|
"Acceleration" if str(device) != "cpu" else "") |
|
|
|
|
|
cap = cv2.VideoCapture(0) |
|
recognized_text = "" |
|
current_events = "" |
|
keyword = "" |
|
|
|
|
|
def camera(): |
|
while True: |
|
_, img = cap.read() |
|
cv2.imwrite("./database/current_frame.jpg", img) |
|
|
|
|
|
def understand_surroundings(): |
|
global current_events |
|
while True: |
|
current_events = answer_question( |
|
"Describe your surroundings", "./database/current_frame.jpg") |
|
|
|
time.sleep(10) |
|
|
|
|
|
def note(text): |
|
with open("database/notes.txt", "w") as notes: |
|
notes.write(text) |
|
|
|
|
|
def output(response, input_text): |
|
commands = response.split("<###CRYSTAL-INTERNAL###>") |
|
helper = "" |
|
image_file = "current_frame.jpg" |
|
record_chat(input_text.split(":")[0], input_text.split(":")[1:]) |
|
|
|
for command in commands: |
|
command_parts = command.split('\n', 1) |
|
command = command_parts[0] |
|
body = command_parts[1] if len(command_parts) > 1 else "" |
|
|
|
if SPEECH_COMMAND in command: |
|
speak(body) |
|
|
|
elif VQA_COMMAND in command: |
|
helper += answer_question(body, image_file) |
|
|
|
elif ROBOT_COMMAND in command: |
|
description = find_object_description(body, image_file) |
|
all_objects = identify_objects_from_text(description) |
|
objects_list = locate_object(all_objects, image_file) |
|
robot_code = robotix(f"#{input_text}\nobjects={objects_list}") |
|
with open("robot_control.py", "w+") as robot_file: |
|
robot_file.write("import robot\n\n") |
|
robot_file.write(robot_code) |
|
exec(robot_file.read()) |
|
|
|
elif INTERNET_COMMAND in command: |
|
with open("extract_internet.py", "w+") as web_file: |
|
web_file.write("import crystal_internet\n\n") |
|
web_file.write(body) |
|
exec(web_file.read()) |
|
|
|
with open("./database/web_results.txt", 'r') as web_results: |
|
helper += web_results.read() |
|
|
|
elif CLI_COMMAND in command: |
|
os.system(body) |
|
|
|
elif NOTE_COMMAND in command: |
|
note(body) |
|
|
|
if helper: |
|
record_chat("Helper", helper) |
|
response = perceptrix(response+"\nHelper: "+helper) |
|
output(response) |
|
|
|
|
|
def get_weather(): |
|
global weather |
|
while True: |
|
data = get_weather_data() |
|
location = data["location"] |
|
temperature = data["temperature"] |
|
details = data["details"] |
|
name = data["name"] |
|
|
|
weather = f"{location} is {name} with {temperature} and {details}" |
|
time.sleep(90) |
|
|
|
|
|
eyes = threading.Thread(target=camera) |
|
ears = threading.Thread(target=live_listen) |
|
weather_thread = threading.Thread(target=get_weather) |
|
visual_processing = threading.Thread(target=understand_surroundings) |
|
|
|
ears.start() |
|
eyes.start() |
|
weather_thread.start() |
|
visual_processing.start() |
|
|
|
|
|
while True: |
|
full_history = load_chat() |
|
username = find_user("recording.wav") |
|
with open("database/notes.txt", "r") as notes: |
|
notes = notes.read() |
|
input_text = f"Time- {get_time()}\nWeather- {weather}\nSurroundings- {current_events}"+(f"\nNotes- {notes}" if notes else "") |
|
|
|
with open("./database/input.txt", 'r') as recognized: |
|
recognized_text = recognized.read() |
|
|
|
if recognized_text: |
|
print("CRYSTAL heard:", recognized_text, "from", username) |
|
input_text += "\nPrompt: "+recognized_text |
|
keyword = search_keyword(input_text) |
|
relevant_history = [] |
|
|
|
with open("database/notes.txt", "r") as notes: |
|
notes = notes.read() |
|
|
|
if keyword.startswith("Search Keyword"): |
|
keyword = keyword.split(":")[-1] |
|
elif keyword.startswith("Chat Index"): |
|
keyword = int(keyword.split(":")[-1]) |
|
|
|
if type(keyword) == str: |
|
for chat in full_history: |
|
if keyword in chat: |
|
relevant_history.append(chat) |
|
|
|
relevant_history = "\n".join(relevant_history) |
|
|
|
elif type(keyword) == int: |
|
relevant_history = full_history[keyword] |
|
|
|
relevant_history += "\n"+"\n".join(full_history[-3:]) |
|
|
|
relevant_history = f"{relevant_history}\n{username}: " + \ |
|
"\n" + input_text + "\nCRYSTAL: " |
|
response = str(perceptrix(relevant_history)) |
|
with open("./database/input.txt", 'w') as clearfile: |
|
clearfile.write("") |
|
|
|
if current_events != "": |
|
print("CRYSTAL sees:", current_events) |
|
response = str(perceptrix(input_text)) |
|
current_events = "" |
|
|
|
output(response, input_text) |
|
|