|
from transformers import LlamaForCausalLM, LlamaTokenizer, BitsAndBytesConfig, GenerationConfig |
|
from utils import setup_device |
|
import torch |
|
import tqdm |
|
import os |
|
|
|
model_name = os.environ.get('LLM_MODEL') |
|
|
|
model_path = "models/CRYSTAL-instruct" if model_name == None else model_name |
|
|
|
device = setup_device() |
|
|
|
bnb_config = BitsAndBytesConfig( |
|
load_in_4bit=True, |
|
bnb_4bit_use_double_quant=True, |
|
bnb_4bit_quant_type="nf4", |
|
bnb_4bit_compute_dtype=torch.float32 if device == "cpu" else torch.bfloat16 |
|
) |
|
|
|
model = LlamaForCausalLM.from_pretrained( |
|
model_name, |
|
torch_dtype=torch.float32 if device == "cpu" else torch.bfloat16, |
|
device_map="auto", |
|
trust_remote_code=True, |
|
low_cpu_mem_usage=True, |
|
offload_folder="offloads", |
|
quantization_config=bnb_config if str(device) != "cpu" else None, |
|
) |
|
|
|
tokenizer = LlamaTokenizer.from_pretrained( |
|
model_name, |
|
trust_remote_code=True, |
|
use_fast=True, |
|
) |
|
|
|
PROMPT = '''### Instruction: |
|
{} |
|
### Input: |
|
{} |
|
### Response:''' |
|
|
|
if tokenizer.pad_token_id is None: |
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
tokenizer.padding_side = "left" |
|
tokenizer = tokenizer |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
model.eval() |
|
|
|
|
|
def evaluate( |
|
prompt='', |
|
temperature=0.4, |
|
top_p=0.65, |
|
top_k=35, |
|
repetition_penalty=1.1, |
|
max_new_tokens=512, |
|
stream_output=False, |
|
**kwargs, |
|
): |
|
inputs = tokenizer(prompt, return_tensors="pt") |
|
input_ids = inputs["input_ids"].to(device) |
|
generation_config = GenerationConfig( |
|
temperature=temperature, |
|
top_p=top_p, |
|
top_k=top_k, |
|
repetition_penalty=repetition_penalty, |
|
**kwargs, |
|
) |
|
|
|
with torch.no_grad(): |
|
generation_output = model.generate( |
|
input_ids=input_ids, |
|
generation_config=generation_config, |
|
return_dict_in_generate=True, |
|
output_scores=True, |
|
max_new_tokens=max_new_tokens, |
|
eos_token_id=tokenizer.eos_token_id, |
|
pad_token_id=tokenizer.pad_token_id, |
|
) |
|
s = generation_output.sequences[0] |
|
output = tokenizer.decode(s, skip_special_tokens=True) |
|
yield output.split("### Response:")[-1].strip() |
|
|
|
|
|
def run_instruction( |
|
instructions, |
|
inputs, |
|
temperature=0.4, |
|
top_p=0.65, |
|
top_k=35, |
|
repetition_penalty=1.1, |
|
max_new_tokens=512, |
|
stream_output=False, |
|
): |
|
now_prompt = PROMPT.format(instructions+'\n', inputs) |
|
|
|
response = evaluate( |
|
now_prompt, temperature, top_p, top_k, repetition_penalty, max_new_tokens, stream_output, do_sample=True |
|
) |
|
|
|
if stream_output: |
|
response = tqdm.tqdm(response, unit='token') |
|
|
|
for i in response: |
|
print(i) |
|
response = i |
|
|
|
return response |
|
|
|
|
|
def search_keyword(prompt): |
|
instructions = """Prompt:Time: Fri, 23 August 2023 2:30PM\nWeather: 73F\nHow many friends have I told you about? |
|
Search Keyword:Friends |
|
Prompt:Time: Thu, 27 September 2023 3:41PM\nWeather: 62F\nWhat was our very first conversation |
|
Chat Index:0 |
|
Prompt:Time: Tue, 21 September 2023 2:30PM\nWeather: 67F\nWhat was the last thing I said to you |
|
Chat Index:-1 |
|
Prompt:Time: Sun, 31 October 2023 7:33AM\nWeather: 59F\nWhat was the last thing I said to you before that |
|
Chat Index:-2 |
|
Prompt:Time: Sat, 30 October 2023 8:21PM\nWeather: 65F\nDid I ever tell you about my math class? |
|
Search Keyword:math |
|
Prompt:Time: Mon, 13 November 2023 4:52PM\nWeather: 55F\nWhat was my 7th grade English teacher's name? |
|
Search Keyword:English |
|
Prompt:Time: Wed, 15 May 2023 6:19PM\nWeather: 80F\nWhere did I say my wallet was? |
|
Search Keyword:Wallet |
|
Prompt:Time: Fri, 24 June 2023 1:52PM\nWeather: 92F\nWhat did Alex tell you? |
|
Search Keyword:Alex |
|
Prompt:Time: Sat, 19 July 2023 2:44PM\nWeather: 91F\nWhat was my first conversation today |
|
Search Keyword:24 June""" |
|
answer = ''.join(run_instruction( |
|
instructions, |
|
"Prompt:"+prompt+"\n", |
|
temperature=0.5, |
|
top_p=0.5, |
|
top_k=200, |
|
repetition_penalty=1.1, |
|
max_new_tokens=256, |
|
stream_output=False, |
|
)) |
|
return answer |
|
|
|
|
|
def identify_objects_from_text(prompt): |
|
instructions = """Input:The object that flies in the air from this picture is a toy helicopter |
|
Output:Toy helicopter |
|
Input:For the robot to be able to achieve the task, the robot needs to look for a white shirt |
|
Output:White shirt |
|
Input:To complete the task, the robot needs to remove the fruits from the wooden basket. |
|
Output:fruits, wooden basket |
|
Input:To clean up your desk, you need to gather and organize the various items scattered around it. This includes the laptop, cell phone, scissors, pens, and other objects. By putting these items back in their designated spaces or containers, you can create a more organized and clutter-free workspace. |
|
Output:Laptop, cell phone, scissors, pens, containers |
|
Input:The tree with a colorful sky background is the one to be looking for. |
|
Output:Tree""" |
|
answer = ''.join(run_instruction( |
|
instructions, |
|
prompt, |
|
temperature=0.5, |
|
top_p=0.5, |
|
top_k=200, |
|
repetition_penalty=1.1, |
|
max_new_tokens=256, |
|
stream_output=False, |
|
)) |
|
return answer |
|
|
|
|
|
def robotix(prompt): |
|
instructions = """#Get me some water |
|
objects = [['water: 57%', (781, 592)]] |
|
robot.target((781, 592)) |
|
object_distance = distance() |
|
if object_distance > 10: |
|
robot.go("forward", object_distance, track="water") |
|
robot.grab() |
|
if object_distance > 10: |
|
robot.go("back", object_distance) |
|
robot.release("here") |
|
### Input: |
|
#Stand by the table |
|
objects = [['table: 81%', (1489, 1173)], ['table: 75%', (1971, 1293)]] |
|
### Response: |
|
robot.target((1489, 1173)) |
|
if distance() > 10: |
|
robot.go(forward, distance()) |
|
### Input: |
|
#Put the apples in the basket |
|
objects = [['basket: 77%', (89, 112)], ['apples: 72%', (222, 182)]] |
|
### Response: |
|
robot.target((281, 189)) |
|
if distance() > 10: |
|
robot.go("forward", distance(), track="apples") |
|
robot.grab() |
|
robot.target(robot.find("basket")) |
|
robot.release(distance()) |
|
### Input: |
|
#Go to the sofa |
|
objects=[['sofa: 81%', (1060, 931)]] |
|
### Response: |
|
robot.target((1060, 931)) |
|
if distance() > 10: |
|
robot.go("forward", distance()) |
|
### Input: |
|
#Go to that person over there and then come back |
|
objects=[['person: 85%', (331, 354)]] |
|
### Response: |
|
robot.target((331, 354)) |
|
object_distance = distance() |
|
if object_distance > 10: |
|
robot.go("forward", object_distance) |
|
robot.go("backward", object_distance)""" |
|
|
|
answer = ''.join(run_instruction( |
|
instructions, |
|
prompt, |
|
temperature=0.2, |
|
top_p=0.5, |
|
top_k=300, |
|
repetition_penalty=1.1, |
|
max_new_tokens=256, |
|
stream_output=False, |
|
)) |
|
return answer |