learn-ai / telegram_bot.py
inflaton's picture
limit telegram reply len to 8192 bytes
5cc0091
raw
history blame
No virus
2.89 kB
import os
import ssl
import time
from threading import Thread
import requests
from telegram import Update
from telegram import __version__ as TG_VER
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from app_modules.init import *
ctx = ssl.create_default_context()
ctx.set_ciphers("DEFAULT")
try:
from telegram import __version_info__
except ImportError:
__version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
if __version_info__ < (20, 0, 0, "alpha", 1):
raise RuntimeError(
f"This example is not compatible with your current PTB version {TG_VER}. To view the "
f"{TG_VER} version of this example, "
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
)
TOKEN = os.getenv("TELEGRAM_API_TOKEN")
ENDPOINT = os.getenv("CHAT_API_URL")
# Define a few command handlers. These usually take the two arguments update and
# context.
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /start is issued."""
user = update.effective_user
await update.message.reply_html(
rf"Hi {user.mention_html()}! You are welcome to ask questions on anything!",
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /help is issued."""
await update.message.reply_text("Help!")
async def chat_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Echo the user message."""
print(update)
tic = time.perf_counter()
try:
message = {
"question": update.message.text,
"chat_id": update.message.chat.id,
}
print(message)
x = requests.post(ENDPOINT, json=message).json()
temp = time.perf_counter()
print(f"Received response in {temp - tic:0.4f} seconds")
print(x)
result = x["result"]
print(result)
await update.message.reply_text(result[0:8192])
toc = time.perf_counter()
print(f"Response time in {toc - tic:0.4f} seconds")
except Exception as e:
print("error", e)
def start_telegram_bot() -> None:
"""Start the bot."""
print("starting telegram bot ...")
# Create the Application and pass it your bot's token.
application = Application.builder().token(TOKEN).build()
# on different commands - answer in Telegram
application.add_handler(CommandHandler("start_command", start_command))
application.add_handler(CommandHandler("help", help_command))
# on non command i.e message - chat_command the message on Telegram
application.add_handler(
MessageHandler(filters.TEXT & ~filters.COMMAND, chat_command)
)
application.run_polling()
if __name__ == "__main__":
start_telegram_bot()