Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
import os | |
import re | |
import numpy as np | |
import pandas as pd | |
from tqdm import tqdm | |
import matplotlib.pyplot as plt | |
import warnings | |
import nltk | |
import random, time | |
import datetime | |
# nltk.download("stopwords") | |
from nltk.corpus import stopwords | |
import torch | |
import torch.nn as nn | |
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler,random_split | |
from sklearn.metrics import classification_report | |
import transformers | |
from transformers import BartForSequenceClassification, AdamW, BartTokenizer, get_linear_schedule_with_warmup, pipeline, set_seed | |
from transformers import pipeline, set_seed, BartTokenizer | |
from datasets import load_dataset, load_metric | |
from dotenv import load_dotenv | |
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer | |
from nltk.tokenize import sent_tokenize | |
from datasets import Dataset, load_metric | |
import datasets | |
import gradio as gr | |
import pyperclip | |
import openai | |
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer | |
from transformers import TrainingArguments, Trainer | |
# from vicuna_generate import * | |
# from convert_article import * | |
# Data preprocessing | |
def text_preprocessing(s): | |
""" | |
- Lowercase the sentence | |
- Change "'t" to "not" | |
- Remove "@name" | |
- Isolate and remove punctuations except "?" | |
- Remove other special characters | |
- Remove stop words except "not" and "can" | |
- Remove trailing whitespace | |
""" | |
s = s.lower() | |
# Change 't to 'not' | |
s = re.sub(r"\'t", " not", s) | |
# Remove @name | |
s = re.sub(r'(@.*?)[\s]', ' ', s) | |
# Isolate and remove punctuations except '?' | |
s = re.sub(r'([\'\"\.\(\)\!\?\\\/\,])', r' \1 ', s) | |
s = re.sub(r'[^\w\s\?]', ' ', s) | |
# Remove some special characters | |
s = re.sub(r'([\;\:\|•«\n])', ' ', s) | |
# Remove stopwords except 'not' and 'can' | |
s = " ".join([word for word in s.split() | |
if word not in stopwords.words('english') | |
or word in ['not', 'can']]) | |
# Remove trailing whitespace | |
s = re.sub(r'\s+', ' ', s).strip() | |
return s | |
def text_preprocessing(text): | |
""" | |
- Remove entity mentions (eg. '@united') | |
- Correct errors (eg. '&' to '&') | |
@param text (str): a string to be processed. | |
@return text (Str): the processed string. | |
""" | |
# Remove '@name' | |
text = re.sub(r'(@.*?)[\s]', ' ', text) | |
# Replace '&' with '&' | |
text = re.sub(r'&', '&', text) | |
# Remove trailing whitespace | |
text = re.sub(r'\s+', ' ', text).strip() | |
return text | |
# Total number of training steps is [number of batches] x [number of epochs]. | |
# (Note that this is not the same as the number of training samples). | |
# Create the learning rate scheduler. | |
# Function to calculate the accuracy of our predictions vs labels | |
def flat_accuracy(preds, labels): | |
pred_flat = np.argmax(preds, axis=1).flatten() | |
labels_flat = labels.flatten() | |
return np.sum(pred_flat == labels_flat) / len(labels_flat) | |
def format_time(elapsed): | |
''' | |
Takes a time in seconds and returns a string hh:mm:ss | |
''' | |
# Round to the nearest second. | |
elapsed_rounded = int(round((elapsed))) | |
# Format as hh:mm:ss | |
return str(datetime.timedelta(seconds=elapsed_rounded)) | |
def decode(paragraphs_needed): | |
# model_ckpt = "facebook/bart-large-cnn" | |
tokenizer = AutoTokenizer.from_pretrained("theQuert/NetKUp-tokenzier") | |
# pipe = pipeline("summarization", model="bart-decoder",tokenizer=tokenizer) | |
pipe = pipeline("summarization", model="hyesunyun/update-summarization-bart-large-longformer",tokenizer=tokenizer) | |
contexts = [str(pipe(paragraph)) for paragraph in paragraphs_needed] | |
return contexts | |
def split_article(article, trigger): | |
if article.split("\n"): article = article.replace("\n", "\\\\c\\\\c") | |
paragraphs = article.replace("\\c\\c", "\c\c").split("\\\\c\\\\c") | |
pars = [str(par) + " -- " + str(trigger) for par in paragraphs] | |
# pd.DataFrame({"paragraph": pars}).to_csv("./util/experiments/input_paragraphs.csv") | |
return pars | |
def config(): | |
load_dotenv() | |
def call_gpt(paragraph, trigger): | |
openai.api_key = os.environ.get("GPT_API") | |
tokenizer = BartTokenizer.from_pretrained("theQuert/NetKUp-tokenzier") | |
inputs_for_gpt = f""" | |
As an article writer, your task is to provide an updated paragraph in the length same as non-updated paragraph based on the given non-updated paragraph and a triggered news. | |
Non-updated paragraph: | |
{paragraph} | |
Triggered News: | |
{trigger} | |
""" | |
# merged_with_prompts.append(merged.strip()) | |
# pd.DataFrame({"paragraph": merged_with_prompts}).to_csv("./experiments/paragraphs_with_prompts.csv") | |
completion = openai.ChatCompletion.create( | |
model = "gpt-3.5-turbo", | |
messages = [ | |
{"role": "user", "content": inputs_for_gpt} | |
] | |
) | |
response = completion.choices[0].message.content | |
return str(response) | |
def call_vicuna(paragraphs_tirgger): | |
tokenizer = BartTokenizer.from_pretrained("theQuert/NetKUp-tokenzier") | |
merged_with_prompts = [] | |
for paragraph in paragraphs: | |
merged = f""" | |
As an article writer, your task is to provide an updated paragraph in the length same as non-updated paragraph based on the given non-updated paragraph and a triggered news. | |
Non-updated paragraph: | |
{paragraph} | |
Triggered News: | |
{trigger} | |
""" | |
merged_with_prompts.append(merged.strip()) | |
pd.DataFrame({"paragraph": merged_with_prompts}).to_csv("./util/experiments/paragraphs_with_prompts.csv") | |
responses = vicuna_output() | |
return responses | |
def main(input_article, input_trigger): | |
# csv_path = "./util/experiments/input_paragraphs.csv" | |
# if os.path.isfile(csv_path): | |
# os.remove(csv_path) | |
modified = "TRUE" | |
# device = "cuda" if torch.cuda.is_available() else "cpu" | |
device="cpu" | |
# tokenizer = BartTokenizer.from_pretrained('facebook/bart-large-cnn', do_lower_case=True) | |
tokenizer = AutoTokenizer.from_pretrained('theQuert/NetKUp-tokenzier') | |
batch_size = 8 | |
model = torch.load("./util/bart_model", map_location=torch.device("cpu")) | |
optimizer = AdamW(model.parameters(), | |
lr = 2e-5, | |
eps = 1e-8 | |
) | |
# split the input article to paragraphs in tmp csv format | |
data_test = split_article(input_article, input_trigger) | |
seed_val = 42 | |
random.seed(seed_val) | |
np.random.seed(seed_val) | |
torch.manual_seed(seed_val) | |
# torch.cuda.manual_seed_all(seed_val) | |
input_ids = [] | |
attention_masks = [] | |
for sent in data_test: | |
encoded_dict = tokenizer.encode_plus( | |
text_preprocessing(sent), | |
add_special_tokens = True, | |
max_length = 600, | |
pad_to_max_length = True, | |
return_attention_mask = True, | |
return_tensors = 'pt', | |
truncation=True | |
) | |
input_ids.append(encoded_dict['input_ids']) | |
attention_masks.append(encoded_dict['attention_mask']) | |
input_ids = torch.cat(input_ids, dim=0) | |
attention_masks = torch.cat(attention_masks, dim=0) | |
test_dataset = TensorDataset(input_ids, attention_masks) | |
test_dataloader = DataLoader( | |
test_dataset, | |
sampler = SequentialSampler(test_dataset), | |
batch_size = batch_size | |
) | |
# Predictions | |
predictions = [] | |
for batch in test_dataloader: | |
b_input_ids = batch[0].to(device) | |
b_input_mask = batch[1].to(device) | |
with torch.no_grad(): | |
output= model(b_input_ids, | |
attention_mask=b_input_mask) | |
logits = output.logits | |
logits = logits.detach().cpu().numpy() | |
pred_flat = np.argmax(logits, axis=1).flatten() | |
predictions.extend(list(pred_flat)) | |
# Write predictions for each paragraph | |
df_output = pd.DataFrame({"target": predictions}).to_csv('./util/experiments/classification.csv', index=False) | |
if len(data_test)==1: predictions[0] = 1 | |
# extract ids for update-needed paragraphs (extract the idx with predicted target == 1) | |
pos_ids = [idx for idx in range(len(predictions)) if predictions[idx]==1] | |
neg_ids = [idx for idx in range(len(predictions)) if predictions[idx]==0] | |
# feed the positive paragraphs to decoder | |
paragraphs_needed = [data_test[idx] for idx in pos_ids] | |
pd.DataFrame({"paragraph": paragraphs_needed}).to_csv("./util/experiments/paragraphs_needed.csv", index=False) | |
# updated_paragraphs = decode(input_paragraph, input_trigger) | |
config() | |
updated_paragraphs = [call_gpt(paragraph.split(" -- ")[0], input_trigger) for paragraph in paragraphs_needed] | |
# updated_paragraphs = call_vicuna(paragraphs_needed, input_trigger) | |
# merge updated paragraphs with non-updated paragraphs | |
paragraphs_merged = data_test.copy() | |
paragraphs_merged = [str(par).split(" -- ")[0] for par in paragraphs_merged] | |
for idx in range(len(pos_ids)): | |
paragraphs_merged[pos_ids[idx]] = updated_paragraphs[idx] | |
sep = "\n" | |
# paragarphs_merged = ["".join(par.split(" -- ")[:-1]) for par in paragraphs_merged] | |
updated_article = str(sep.join(paragraphs_merged)) | |
updated_article = updated_article.replace("[{'summary_text': '", "").replace("'}]", "").strip() | |
class_res = pd.read_csv("./util/experiments/classification.csv") | |
if class_res.target.values.all() == 0: modified="False" | |
if len(data_test)==1: | |
modified="TRUE" | |
updated_article = call_gpt(input_article, input_trigger) | |
with open("./util/experiments/updated_article.txt", "w") as f: | |
f.write(updated_article) | |
# combine the predictions and paragraphs into csv format file | |
merged_par_pred_df = pd.DataFrame({"paragraphs": data_test, "predictions": predictions}).to_csv("./util/experiments/par_with_class.csv") | |
# return updated_article, modified, merged_par_pred_df | |
modified_in_all = str(len(paragraphs_needed)) + " / " + str(len(data_test)) | |
return updated_article, modified_in_all | |
def copy_to_clipboard(t): | |
with open("./util/experiments/updated_article.txt", "r") as f: | |
t = f.read() | |
pyperclip.copy(t) | |
demo = gr.Interface( | |
main, | |
[ | |
gr.Textbox( | |
lines=2, label="Non-updated Article", placeholder="Input the article..." | |
), | |
gr.Textbox( | |
lines=2, label="Triggered News Event", placeholder="Input the triggered news event..." | |
) | |
], | |
[ | |
gr.Textbox( | |
lines=25, | |
label="Output", | |
), | |
gr.Textbox( | |
lines=1, | |
label="#MODIFIED/ALL" | |
), | |
# btn = gr.Button(value="Copy Updated Article to Clipboard") | |
# btn.click(copy_to_clipboard) | |
# gr.components.Button(value="Copy Updated Article to Clipboard", fn=copy_to_clipboard), | |
], | |
title="Event Triggered Article Updating System", | |
description="Powered by YTLee", | |
) | |
demo.launch() | |