Spaces:
Runtime error
Runtime error
import os | |
import ssl | |
import time | |
from threading import Thread | |
import requests | |
from telegram import Update | |
from telegram import __version__ as TG_VER | |
from telegram.ext import ( | |
Application, | |
CommandHandler, | |
ContextTypes, | |
MessageHandler, | |
filters, | |
) | |
from app_modules.init import * | |
ctx = ssl.create_default_context() | |
ctx.set_ciphers("DEFAULT") | |
try: | |
from telegram import __version_info__ | |
except ImportError: | |
__version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment] | |
if __version_info__ < (20, 0, 0, "alpha", 1): | |
raise RuntimeError( | |
f"This example is not compatible with your current PTB version {TG_VER}. To view the " | |
f"{TG_VER} version of this example, " | |
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html" | |
) | |
TOKEN = os.getenv("TELEGRAM_API_TOKEN") | |
ENDPOINT = os.getenv("CHAT_API_URL") | |
# Define a few command handlers. These usually take the two arguments update and | |
# context. | |
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
"""Send a message when the command /start is issued.""" | |
user = update.effective_user | |
await update.message.reply_html( | |
rf"Hi {user.mention_html()}! You are welcome to ask questions on anything!", | |
) | |
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
"""Send a message when the command /help is issued.""" | |
await update.message.reply_text("Help!") | |
async def chat_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
"""Echo the user message.""" | |
print(update) | |
tic = time.perf_counter() | |
try: | |
message = { | |
"question": update.message.text, | |
"chat_id": update.message.chat.id, | |
} | |
print(message) | |
x = requests.post(ENDPOINT, json=message).json() | |
temp = time.perf_counter() | |
print(f"Received response in {temp - tic:0.4f} seconds") | |
print(x) | |
result = x["result"] | |
print(result) | |
await update.message.reply_text(result[0:8192]) | |
toc = time.perf_counter() | |
print(f"Response time in {toc - tic:0.4f} seconds") | |
except Exception as e: | |
print("error", e) | |
def start_telegram_bot() -> None: | |
"""Start the bot.""" | |
print("starting telegram bot ...") | |
# Create the Application and pass it your bot's token. | |
application = Application.builder().token(TOKEN).build() | |
# on different commands - answer in Telegram | |
application.add_handler(CommandHandler("start_command", start_command)) | |
application.add_handler(CommandHandler("help", help_command)) | |
# on non command i.e message - chat_command the message on Telegram | |
application.add_handler( | |
MessageHandler(filters.TEXT & ~filters.COMMAND, chat_command) | |
) | |
application.run_polling() | |
if __name__ == "__main__": | |
start_telegram_bot() | |