|
import os |
|
import csv |
|
import gradio as gr |
|
import itertools |
|
import random |
|
from collections import deque |
|
|
|
DATA_FILENAME = "data.csv" |
|
DATA_FILE = os.path.join("/data", DATA_FILENAME) |
|
|
|
MAX_LINES = 20 |
|
|
|
def generate_html(me: str) -> str: |
|
try: |
|
with open(DATA_FILE) as csvfile: |
|
reader = csv.reader(csvfile) |
|
rows = deque(reader, maxlen=MAX_LINES) |
|
if len(rows) == 0: |
|
return "no messages yet" |
|
else: |
|
html = "<div class='chatbot'>" |
|
for row, _ in itertools.zip_longest(rows, range(MAX_LINES), fillvalue=("", "")): |
|
username = row[0] |
|
if username == me: |
|
msg_type = "own" |
|
elif username == "[chatbot]": |
|
msg_type = "admin" |
|
elif username == "": |
|
msg_type = "empty" |
|
else: |
|
msg_type = "other" |
|
html += "<div class='message'>" |
|
if msg_type != "admin": |
|
html += f"<span class='nick'>{row[0]}</span>" |
|
html += f"<span class='{msg_type}'>{row[1]}</span>" |
|
html += "</div>" |
|
html += "</div>" |
|
return html |
|
except: |
|
return "no messages yet" |
|
|
|
|
|
def refresh(state): |
|
return generate_html(state["username"]) |
|
|
|
def store_message(writer: str, message: str, me: str): |
|
if writer and message: |
|
with open(DATA_FILE, "a") as csvfile: |
|
csv.writer(csvfile).writerow([writer, message]) |
|
|
|
return generate_html(me) |
|
|
|
anons = "alligator, anteater, armadillo, auroch, axolotl, badger, bat, bear, beaver, blobfish, buffalo, camel, chameleon, cheetah, chipmunk, chinchilla, chupacabra, cormorant, coyote, crow, dingo, dinosaur, dog, dolphin, dragon, duck, dumbo octopus, elephant, ferret, fox, frog, giraffe, goose, gopher, grizzly, hamster, hedgehog, hippo, hyena, jackal, jackalope, ibex, ifrit, iguana, kangaroo, kiwi, koala, kraken, lemur, leopard, liger, lion, llama, manatee, mink, monkey, moose, narwhal, nyan cat, orangutan, otter, panda, penguin, platypus, python, pumpkin, quagga, quokka, rabbit, raccoon, rhino, sheep, shrew, skunk, slow loris, squirrel, tiger, turtle, unicorn, walrus, wolf, wolverine, wombat" |
|
anons = anons.split(",") |
|
|
|
def login(username, state): |
|
if username == "": |
|
username = f"Anonymous {random.choice(anons).strip()}" |
|
print(username) |
|
state["username"] = username |
|
store_message("[chatbot]", f"{username} joins the chat", username) |
|
return ( |
|
state, |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
generate_html(username), |
|
) |
|
|
|
def send_message(message, state): |
|
username = state["username"] |
|
if message != "": |
|
store_message(username, message, me=username) |
|
return (generate_html(username), "") |
|
|
|
css = """ |
|
.message {height: 28px;} |
|
.nick {font-weight:bold;} |
|
.other {background-color:cornflowerblue;color:white; padding:4px;margin:4px;border-radius:4px; } |
|
.own {background-color:lime;color:white; padding:4px;margin:4px;border-radius:4px; } |
|
.admin {background-color:orange;color:black; padding:4px;margin:4px;border-radius:4px; } |
|
""" |
|
with gr.Blocks(css=css) as demo: |
|
state = gr.Variable({ |
|
'username': None, |
|
}) |
|
|
|
gr.Markdown( |
|
""" |
|
<h1><center>Persistent Storage Chat</center></h1> |
|
<p>This is a simple demo that implements a chatroom using persistent storage.</p> |
|
""" |
|
) |
|
|
|
with gr.Group(): |
|
with (login_box := gr.Row(equal_height=True)): |
|
username = gr.Textbox( |
|
label="What's your name?", show_label=True, max_lines=1 |
|
) |
|
login_btn = gr.Button("Enter the chat") |
|
|
|
with (chat_box := gr.Box(visible=False)): |
|
with gr.Row(): |
|
output = gr.HTML(label="Chat") |
|
with gr.Row(): |
|
message = gr.Textbox( |
|
label="Your message", show_label=True, max_lines=1 |
|
) |
|
|
|
username.submit( |
|
login, |
|
inputs=[username, state], |
|
outputs=[state, login_box, chat_box, output], |
|
) |
|
login_btn.click( |
|
fn=login, |
|
inputs=[username, state], |
|
outputs=[state, login_box, chat_box, output], |
|
) |
|
|
|
message.submit( |
|
send_message, |
|
inputs=[message, state], |
|
outputs=[output, message], |
|
) |
|
demo.load(refresh, queue=True, inputs=[state], outputs=output, every=5) |
|
demo.queue().launch() |
|
|