File size: 3,754 Bytes
3129965 c83bd3f 3129965 c83bd3f 3129965 c83bd3f 3129965 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
import torch
from typing import Dict, List, Any
import torch.nn as nn
from transformers import GPT2LMHeadModel, GPT2Config, GPT2Tokenizer, PreTrainedModel
from transformers.modeling_outputs import CausalLMOutput
import torch.nn as nn
import torch
import torch.nn.functional as F
# # get dtype
# dtype = torch.bfloat16 if torch.cuda.get_device_capability()[0] == 8 else torch.float16
class CustomGPT2Model(PreTrainedModel):
def __init__(self, config):
super(CustomGPT2Model, self).__init__(config)
self.gpt2 = GPT2LMHeadModel.from_pretrained('gpt2-medium')
# Create an MLP layer to transform the ada-002 embedding to the GPT-2 hidden size
self.mlp = nn.Sequential(
nn.Linear(1536, 768), # Adjust the hidden layer size as necessary
nn.ReLU(),
nn.Linear(768, config.n_embd) # Adjust the output size to match GPT-2 embedding size
)
def forward(self, inputs=None, ada_embedding=None, decoded_tkns=None, labels=None):
emb = self.mlp(ada_embedding)
emb = emb.unsqueeze(1)
if decoded_tkns is not None:
# Add the "encoded:" prefix, ada-002 embedding, "decoded:" prefix, and the decoded token
decoded_tkns = torch.cat([emb, self.gpt2.transformer.wte(decoded_tkns)], dim=1)
else:
decoded_tkns = emb
# Create the position ids
position_ids = torch.arange(0, decoded_tkns.size(1), dtype=torch.long).unsqueeze(0).to(emb.device)
# Forward the embeddings through the GPT-2 model with the correct position ids
outputs = self.gpt2(inputs_embeds=decoded_tkns, position_ids=position_ids)
logits = outputs.logits
loss = None
if labels is not None:
loss_fct = CrossEntropyLoss()
loss = loss_fct(logits.view(-1, logits.size(-1)), labels.view(-1))
return CausalLMOutput(loss, logits, outputs.hidden_states)
class EndpointHandler:
def __init__(self, path=""):
# load the model
# Load the GPT-2 configuration
self.config = GPT2Config.from_pretrained('gpt2-medium')
# Create the custom GPT-2 model and load the trained weights
self.model = CustomGPT2Model.from_pretrained(path, config=self.config)
# Load the tokenizer
self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2-medium')
def __call__(self, data: Any) -> List[List[Dict[str, float]]]:
embedding = data.pop("embedding", None)
ada_embedding = torch.tensor(embedding).unsqueeze(0)
max_length=200
with torch.no_grad():
outputs = self.model(ada_embedding=ada_embedding, decoded_tkns=None)
decoded_tkns = outputs.logits.argmax(dim=-1)
for _ in range(max_length):
with torch.no_grad():
outputs = self.model(ada_embedding=ada_embedding, decoded_tkns=decoded_tkns)
# Get the most likely next token, sampled from top k
logits = outputs.logits[:, -1]
top_k_logits, top_k_indices = torch.topk(logits, k = 5)
next_token = torch.multinomial(F.softmax(top_k_logits, dim=-1), num_samples=1)
next_token = top_k_indices.gather(dim=1, index=next_token)
if next_token[0].item() == self.tokenizer.eos_token_id:
break
decoded_tkns = torch.cat((decoded_tkns, next_token), dim=1)
# Convert the tensor of token IDs to a list of token IDs
token_ids = decoded_tkns[0].cpu().numpy().tolist()
# Decode the token IDs back to a string
output_text = self.tokenizer.decode(token_ids, skip_special_tokens=True)
return output_text |